On Wed, Mar 18, 2020 at 4:50 PM Alvaro Herrera <alvhe...@2ndquadrant.com> wrote: > > > well, not "forever", but: > > $ make check PROVE_TESTS=t/019_standby_logical_decoding_conflicts.pl PROVE_FLAGS=-v > ... > cd /pgsql/source/master/src/test/recovery && TESTDIR='/home/alvherre/mnt/crypt/alvherre/Code/pgsql/build/master/src/test/recovery' PATH="/pgsql/build/master/tmp_install/pgsql/install/master/bin:$PATH" LD_LIBRARY_PATH="/pgsql/build/master/tmp_install/pgsql/install/master/lib" PGPORT='655432' PG_REGRESS='/home/alvherre/mnt/crypt/alvherre/Code/pgsql/build/master/src/test/recovery/../../../src/test/regress/pg_regress' REGRESS_SHLIB='/pgsql/build/master/src/test/regress/regress.so' /usr/bin/prove -I /pgsql/source/master/src/test/perl/ -I /pgsql/source/master/src/test/recovery -v t/ 019_standby_logical_decoding_conflicts.pl > t/019_standby_logical_decoding_conflicts.pl .. > 1..24 > ok 1 - dropslot on standby created > ok 2 - activeslot on standby created > # poll_query_until timed out executing this query: > # SELECT '0/35C9190' <= replay_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = 'standby'; > # expecting this output: > # t > # last actual query output: > # > # with stderr: > Bailout called. Further testing stopped: system pg_ctl failed > Bail out! system pg_ctl failed > FAILED--Further testing stopped: system pg_ctl failed > make: *** [Makefile:19: check] Error 255 >
After rebase and did minimal tweaks (duplicated oid, TAP tests numbering) I'm facing similar problem but in other place: make -C src/test/recovery check PROVE_TESTS=t/ 023_standby_logical_decoding_conflicts.pl PROVE_FLAGS=-v ... /usr/bin/mkdir -p '/data/src/pg/main/src/test/recovery'/tmp_check cd . && TESTDIR='/data/src/pg/main/src/test/recovery' PATH="/d/src/pg/main/tmp_install/home/fabrizio/pgsql/logical-decoding-standby/bin:$PATH" LD_LIBRARY_PATH="/d/src/pg/main/tmp_install/home/fabrizio/pgsql/logical-decoding-standby/lib" PGPORT='65432' PG_REGRESS='/data/src/pg/main/src/test/recovery/../../../src/test/regress/pg_regress' REGRESS_SHLIB='/d/src/pg/main/src/test/regress/regress.so' /usr/bin/prove -I ../../../src/test/perl/ -I . -v t/ 023_standby_logical_decoding_conflicts.pl t/023_standby_logical_decoding_conflicts.pl .. 1..24 ok 1 - dropslot on standby created ok 2 - activeslot on standby created not ok 3 - dropslot on standby dropped # Failed test 'dropslot on standby dropped' # at t/023_standby_logical_decoding_conflicts.pl line 67. # got: 'logical' # expected: '' not ok 4 - activeslot on standby dropped # Failed test 'activeslot on standby dropped' # at t/023_standby_logical_decoding_conflicts.pl line 68. # got: 'logical' # expected: '' TAP tests hang forever in `check_slots_dropped` exactly here: # our client should've terminated in response to the walsender error eval { $slot_user_handle->finish; }; Regards, -- Fabrízio de Royes Mello PostgreSQL Developer at OnGres Inc. - https://ongres.com
From a3e42f2fc53afd2bcbe6da9d029d91694a34562e Mon Sep 17 00:00:00 2001 From: Amit Khandekar <amit.khande...@enterprisedb.com> Date: Thu, 16 Jan 2020 10:05:15 +0530 Subject: [PATCH v7 1/5] Allow logical decoding on standby. Allow a logical slot to be created on standby. Restrict its usage or its creation if wal_level on primary is less than logical. During slot creation, it's restart_lsn is set to the last replayed LSN. Effectively, a logical slot creation on standby waits for an xl_running_xact record to arrive from primary. Conflicting slots would be handled in next commits. Andres Freund and Amit Khandekar. --- src/backend/access/transam/xlog.c | 11 +++++ src/backend/replication/logical/decode.c | 22 ++++++++- src/backend/replication/logical/logical.c | 37 ++++++++------- src/backend/replication/slot.c | 57 +++++++++++++++-------- src/backend/replication/walsender.c | 10 ++-- src/include/access/xlog.h | 1 + 6 files changed, 98 insertions(+), 40 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8dd225c2e1..609edbaca6 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5038,6 +5038,17 @@ LocalProcessControlFile(bool reset) ReadControlFile(); } +/* + * Get the wal_level from the control file. For a standby, this value should be + * considered as its active wal_level, because it may be different from what + * was originally configured on standby. + */ +WalLevel +GetActiveWalLevel(void) +{ + return ControlFile->wal_level; +} + /* * Initialization of shared memory for XLOG */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 3f84ee99b8..bb7c80d6cc 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -203,11 +203,31 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; + case XLOG_PARAMETER_CHANGE: + { + xl_parameter_change *xlrec = + (xl_parameter_change *) XLogRecGetData(buf->record); + + /* + * If wal_level on primary is reduced to less than logical, then we + * want to prevent existing logical slots from being used. + * Existing logical slots on standby get dropped when this WAL + * record is replayed; and further, slot creation fails when the + * wal level is not sufficient; but all these operations are not + * synchronized, so a logical slot may creep in while the wal_level + * is being reduced. Hence this extra check. + */ + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level >= logical on master"))); + break; + } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: - case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f1f4df7d70..ec4f7b95e4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -110,23 +110,22 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /* + * This check may have race conditions, but whenever + * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we + * verify that there are no existing logical replication slots. And to + * avoid races around creating a new slot, + * CheckLogicalDecodingRequirements() is called once before creating + * the slot, and once when logical decoding is initially starting up. + */ + if (GetActiveWalLevel() < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level >= logical on master"))); + } } /* @@ -295,6 +294,12 @@ CreateInitDecodingContext(const char *plugin, LogicalDecodingContext *ctx; MemoryContext old_context; + /* + * On standby, this check is also required while creating the slot. Check + * the comments in this function. + */ + CheckLogicalDecodingRequirements(); + /* shorter lines... */ slot = MyReplicationSlot; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 9c7cf13d4d..8a0f51907b 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1097,37 +1097,56 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. But on a standby we cannot do WAL writes, so just use the + * replay pointer; effectively, an attempt to create a logical slot on + * standby will cause it to wait for an xl_running_xact record to be + * logged independently on the primary, so that a snapshot can be built + * using the record. * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead + * return the location of the last redo LSN. While that slightly + * increases the chance that we have to retry, it's where a base backup + * has to start replay at. */ + if (SlotIsPhysical(slot)) + restart_lsn = GetRedoRecPtr(); + else if (RecoveryInProgress()) + { + restart_lsn = GetXLogReplayRecPtr(NULL); + /* + * Replay pointer may point one past the end of the record. If that + * is a XLOG page boundary, it will not be a valid LSN for the + * start of a record, so bump it up past the page header. + */ + if (!XRecOffIsValid(restart_lsn)) + { + if (restart_lsn % XLOG_BLCKSZ != 0) + elog(ERROR, "invalid replay pointer"); + /* For the first page of a segment file, it's a long header */ + if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0) + restart_lsn += SizeOfXLogLongPHD; + else + restart_lsn += SizeOfXLogShortPHD; + } + } + else + restart_lsn = GetXLogInsertRecPtr(); + + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + if (!RecoveryInProgress() && SlotIsLogical(slot)) { XLogRecPtr flushptr; - /* start at current insert position */ - restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); /* and make sure it's fsynced to disk */ XLogFlush(flushptr); } - else - { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - } /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d5c9bc31d8..d8308d7d27 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2878,10 +2878,12 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(); - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(); + if (flushPtr == InvalidXLogRecPtr || + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + { + flushPtr = (am_cascading_walsender ? + GetStandbyFlushRecPtr() : GetFlushRecPtr()); + } /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 221af87e71..ddc167d2c9 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -324,6 +324,7 @@ extern Size XLOGShmemSize(void); extern void XLOGShmemInit(void); extern void BootStrapXLOG(void); extern void LocalProcessControlFile(bool reset); +extern WalLevel GetActiveWalLevel(void); extern void StartupXLOG(void); extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); -- 2.25.1
From 91a987c32179a692e8af7be1646746a93d2f8f56 Mon Sep 17 00:00:00 2001 From: Amit Khandekar <amit.khande...@enterprisedb.com> Date: Thu, 16 Jan 2020 10:05:16 +0530 Subject: [PATCH v7 2/5] Add info in WAL records in preparation for logical slot conflict handling. When a WAL replay on standby indicates that a catalog table tuple is to be deleted by an xid that is greater than a logical slot's catalog_xmin, then that means the slot's catalog_xmin conflicts with the xid, and we need to handle the conflict. While subsequent commits will do the actual conflict handling, this commit adds a new field onCatalogTable in such WAL records, that is true for catalog tables, so as to arrange for conflict handling. Andres Freund. --- src/backend/access/gist/gist.c | 2 +- src/backend/access/gist/gistbuild.c | 2 +- src/backend/access/gist/gistutil.c | 4 ++-- src/backend/access/gist/gistxlog.c | 4 +++- src/backend/access/hash/hashinsert.c | 2 ++ src/backend/access/heap/heapam.c | 10 +++++++--- src/backend/access/heap/vacuumlazy.c | 2 +- src/backend/access/heap/visibilitymap.c | 2 +- src/backend/access/nbtree/nbtpage.c | 4 ++++ src/backend/access/spgist/spgvacuum.c | 8 ++++++++ src/backend/utils/cache/lsyscache.c | 16 ++++++++++++++++ src/include/access/gist_private.h | 6 +++--- src/include/access/gistxlog.h | 3 ++- src/include/access/hash_xlog.h | 1 + src/include/access/heapam_xlog.h | 8 ++++++-- src/include/access/nbtxlog.h | 2 ++ src/include/access/spgxlog.h | 1 + src/include/utils/lsyscache.h | 1 + src/include/utils/rel.h | 9 +++++++++ 19 files changed, 71 insertions(+), 16 deletions(-) diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 3f2b416ce1..a71d914384 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -344,7 +344,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, for (; ptr; ptr = ptr->next) { /* Allocate new page */ - ptr->buffer = gistNewBuffer(rel); + ptr->buffer = gistNewBuffer(heapRel, rel); GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); ptr->page = BufferGetPage(ptr->buffer); ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c index 9d3fa9c3b7..c6f0153d7b 100644 --- a/src/backend/access/gist/gistbuild.c +++ b/src/backend/access/gist/gistbuild.c @@ -290,7 +290,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo) Page page; /* initialize the root page */ - buffer = gistNewBuffer(index); + buffer = gistNewBuffer(heap, index); Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); page = BufferGetPage(buffer); diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index 615b5ade23..6169d6b294 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -823,7 +823,7 @@ gistcheckpage(Relation rel, Buffer buf) * Caller is responsible for initializing the page by calling GISTInitBuffer */ Buffer -gistNewBuffer(Relation r) +gistNewBuffer(Relation heapRel, Relation r) { Buffer buffer; bool needLock; @@ -867,7 +867,7 @@ gistNewBuffer(Relation r) * page's deleteXid. */ if (XLogStandbyInfoActive() && RelationNeedsWAL(r)) - gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page)); + gistXLogPageReuse(heapRel, r, blkno, GistPageGetDeleteXid(page)); return buffer; } diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 91b3e11182..6580c98cb8 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -616,7 +616,8 @@ gistXLogAssignLSN(void) * Write XLOG record about reuse of a deleted page. */ void -gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemovedXid) +gistXLogPageReuse(Relation heapRel, Relation rel, + BlockNumber blkno, FullTransactionId latestRemovedXid) { gistxlogPageReuse xlrec_reuse; @@ -627,6 +628,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemov */ /* XLOG stuff */ + xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(heapRel); xlrec_reuse.node = rel->rd_node; xlrec_reuse.block = blkno; xlrec_reuse.latestRemovedFullXid = latestRemovedXid; diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c index 2ebe671967..e6cef08330 100644 --- a/src/backend/access/hash/hashinsert.c +++ b/src/backend/access/hash/hashinsert.c @@ -17,6 +17,7 @@ #include "access/hash.h" #include "access/hash_xlog.h" +#include "catalog/catalog.h" #include "miscadmin.h" #include "storage/buf_internals.h" #include "storage/lwlock.h" @@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf) xl_hash_vacuum_one_page xlrec; XLogRecPtr recptr; + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel); xlrec.latestRemovedXid = latestRemovedXid; xlrec.ntuples = ndeletable; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index a9583f3103..65bce76277 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7144,12 +7144,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel, * see comments for vacuum_log_cleanup_info(). */ XLogRecPtr -log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid) +log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid) { xl_heap_cleanup_info xlrec; XLogRecPtr recptr; - xlrec.node = rnode; + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); + xlrec.node = rel->rd_node; xlrec.latestRemovedXid = latestRemovedXid; XLogBeginInsert(); @@ -7185,6 +7186,7 @@ log_heap_clean(Relation reln, Buffer buffer, /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln); xlrec.latestRemovedXid = latestRemovedXid; xlrec.nredirected = nredirected; xlrec.ndead = ndead; @@ -7235,6 +7237,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, /* nor when there are no tuples to freeze */ Assert(ntuples > 0); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln); xlrec.cutoff_xid = cutoff_xid; xlrec.ntuples = ntuples; @@ -7265,7 +7268,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, * heap_buffer, if necessary. */ XLogRecPtr -log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, +log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer, TransactionId cutoff_xid, uint8 vmflags) { xl_heap_visible xlrec; @@ -7275,6 +7278,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, Assert(BufferIsValid(heap_buffer)); Assert(BufferIsValid(vm_buffer)); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); xlrec.cutoff_xid = cutoff_xid; xlrec.flags = vmflags; XLogBeginInsert(); diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 25f2d5df1b..4829ed9293 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -718,7 +718,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) * No need to write the record at all unless it contains a valid value */ if (TransactionIdIsValid(vacrelstats->latestRemovedXid)) - (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid); + (void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid); } /* diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c index b1072183bc..9819977688 100644 --- a/src/backend/access/heap/visibilitymap.c +++ b/src/backend/access/heap/visibilitymap.c @@ -282,7 +282,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf, if (XLogRecPtrIsInvalid(recptr)) { Assert(!InRecovery); - recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf, + recptr = log_heap_visible(rel, heapBuf, vmBuf, cutoff_xid, flags); /* diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 793434c026..921aa75ac6 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -32,6 +32,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "storage/predicate.h" +#include "utils/lsyscache.h" #include "utils/memdebug.h" #include "utils/snapmgr.h" @@ -768,6 +769,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX */ /* XLOG stuff */ + xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid); xlrec_reuse.node = rel->rd_node; xlrec_reuse.block = blkno; xlrec_reuse.latestRemovedXid = latestRemovedXid; @@ -1323,6 +1325,8 @@ _bt_delitems_delete(Relation rel, Buffer buf, XLogRecPtr recptr; xl_btree_delete xlrec_delete; + xlrec_delete.onCatalogTable = + RelationIsAccessibleInLogicalDecoding(heapRel); xlrec_delete.latestRemovedXid = latestRemovedXid; xlrec_delete.ndeleted = ndeletable; diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index e1c58933f9..bb771b2fcf 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -27,6 +27,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "utils/snapmgr.h" +#include "utils/lsyscache.h" /* Entry in pending-list of TIDs we need to revisit */ @@ -503,6 +504,13 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) spgxlogVacuumRedirect xlrec; GlobalVisState *vistest; + /* + * There is no chance of endless recursion even when we are doing catalog + * acceses here; because, spgist is never used for catalogs. Check + * comments in RelationIsAccessibleInLogicalDecoding(). + */ + xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid); + xlrec.nToPlaceholder = 0; xlrec.newestRedirectXid = InvalidTransactionId; diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 204bcd03c0..92ce304447 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -18,7 +18,9 @@ #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" +#include "access/table.h" #include "bootstrap/bootstrap.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_am.h" #include "catalog/pg_amop.h" @@ -2035,6 +2037,20 @@ get_rel_persistence(Oid relid) return result; } +bool +get_rel_logical_catalog(Oid relid) +{ + bool res; + Relation rel; + + /* assume previously locked */ + rel = table_open(relid, NoLock); + res = RelationIsAccessibleInLogicalDecoding(rel); + table_close(rel, NoLock); + + return res; +} + /* ---------- TRANSFORM CACHE ---------- */ diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index b68c01a5f2..e2ba53ef74 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -439,8 +439,8 @@ extern XLogRecPtr gistXLogPageDelete(Buffer buffer, FullTransactionId xid, Buffer parentBuffer, OffsetNumber downlinkOffset); -extern void gistXLogPageReuse(Relation rel, BlockNumber blkno, - FullTransactionId latestRemovedXid); +extern void gistXLogPageReuse(Relation heapRel, Relation rel, + BlockNumber blkno, FullTransactionId latestRemovedXid); extern XLogRecPtr gistXLogUpdate(Buffer buffer, OffsetNumber *todelete, int ntodelete, @@ -484,7 +484,7 @@ extern bool gistproperty(Oid index_oid, int attno, extern bool gistfitpage(IndexTuple *itvec, int len); extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace); extern void gistcheckpage(Relation rel, Buffer buf); -extern Buffer gistNewBuffer(Relation r); +extern Buffer gistNewBuffer(Relation heapRel, Relation r); extern bool gistPageRecyclable(Page page); extern void gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off); diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h index 673afee1e1..723867bf15 100644 --- a/src/include/access/gistxlog.h +++ b/src/include/access/gistxlog.h @@ -49,9 +49,9 @@ typedef struct gistxlogPageUpdate */ typedef struct gistxlogDelete { + bool onCatalogTable; TransactionId latestRemovedXid; uint16 ntodelete; /* number of deleted offsets */ - /* * In payload of blk 0 : todelete OffsetNumbers */ @@ -97,6 +97,7 @@ typedef struct gistxlogPageDelete */ typedef struct gistxlogPageReuse { + bool onCatalogTable; RelFileNode node; BlockNumber block; FullTransactionId latestRemovedFullXid; diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h index d1aa6daa40..b44892cc48 100644 --- a/src/include/access/hash_xlog.h +++ b/src/include/access/hash_xlog.h @@ -250,6 +250,7 @@ typedef struct xl_hash_init_bitmap_page */ typedef struct xl_hash_vacuum_one_page { + bool onCatalogTable; TransactionId latestRemovedXid; int ntuples; diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 1525194112..0f111b733a 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -236,6 +236,7 @@ typedef struct xl_heap_update */ typedef struct xl_heap_clean { + bool onCatalogTable; TransactionId latestRemovedXid; uint16 nredirected; uint16 ndead; @@ -251,6 +252,7 @@ typedef struct xl_heap_clean */ typedef struct xl_heap_cleanup_info { + bool onCatalogTable; RelFileNode node; TransactionId latestRemovedXid; } xl_heap_cleanup_info; @@ -331,6 +333,7 @@ typedef struct xl_heap_freeze_tuple */ typedef struct xl_heap_freeze_page { + bool onCatalogTable; TransactionId cutoff_xid; uint16 ntuples; } xl_heap_freeze_page; @@ -345,6 +348,7 @@ typedef struct xl_heap_freeze_page */ typedef struct xl_heap_visible { + bool onCatalogTable; TransactionId cutoff_xid; uint8 flags; } xl_heap_visible; @@ -394,7 +398,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record); extern const char *heap2_identify(uint8 info); extern void heap_xlog_logical_rewrite(XLogReaderState *r); -extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode, +extern XLogRecPtr log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid); extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *redirected, int nredirected, @@ -413,7 +417,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple, bool *totally_frozen); extern void heap_execute_freeze_tuple(HeapTupleHeader tuple, xl_heap_freeze_tuple *xlrec_tp); -extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer, +extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags); #endif /* HEAPAM_XLOG_H */ diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h index 5c014bdc66..f5c60ff82f 100644 --- a/src/include/access/nbtxlog.h +++ b/src/include/access/nbtxlog.h @@ -186,6 +186,7 @@ typedef struct xl_btree_dedup */ typedef struct xl_btree_delete { + bool onCatalogTable; TransactionId latestRemovedXid; uint32 ndeleted; @@ -203,6 +204,7 @@ typedef struct xl_btree_delete */ typedef struct xl_btree_reuse_page { + bool onCatalogTable; RelFileNode node; BlockNumber block; TransactionId latestRemovedXid; diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h index 63d3c63db2..83121c7266 100644 --- a/src/include/access/spgxlog.h +++ b/src/include/access/spgxlog.h @@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot typedef struct spgxlogVacuumRedirect { + bool onCatalogTable; uint16 nToPlaceholder; /* number of redirects to make placeholders */ OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */ TransactionId newestRedirectXid; /* newest XID of removed redirects */ diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index a990d11ea8..f7816542a4 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -138,6 +138,7 @@ extern char get_rel_relkind(Oid relid); extern bool get_rel_relispartition(Oid relid); extern Oid get_rel_tablespace(Oid relid); extern char get_rel_persistence(Oid relid); +extern bool get_rel_logical_catalog(Oid relid); extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes); extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes); extern bool get_typisdefined(Oid typid); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index c5ffea40f2..6f5ffbfd27 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -16,6 +16,7 @@ #include "access/tupdesc.h" #include "access/xlog.h" +#include "catalog/catalog.h" #include "catalog/pg_class.h" #include "catalog/pg_index.h" #include "catalog/pg_publication.h" @@ -345,6 +346,9 @@ typedef struct StdRdOptions * RelationIsUsedAsCatalogTable * Returns whether the relation should be treated as a catalog table * from the pov of logical decoding. Note multiple eval of argument! + * This definition should not invoke anything that performs catalog + * access; otherwise it may cause infinite recursion. Check the comments + * in RelationIsAccessibleInLogicalDecoding() for details. */ #define RelationIsUsedAsCatalogTable(relation) \ ((relation)->rd_options && \ @@ -616,6 +620,11 @@ typedef struct ViewOptions * RelationIsAccessibleInLogicalDecoding * True if we need to log enough information to have access via * decoding snapshot. + * This definition should not invoke anything that performs catalog + * access. Otherwise, e.g. logging a WAL entry for catalog relation may + * invoke this function, which will in turn do catalog access, which may + * in turn cause another similar WAL entry to be logged, leading to + * infinite recursion. */ #define RelationIsAccessibleInLogicalDecoding(relation) \ (XLogLogicalInfoActive() && \ -- 2.25.1
From 38a41501bb640137d1778c5c27989efa05e80783 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= <fabriziome...@gmail.com> Date: Mon, 14 Dec 2020 13:46:20 -0300 Subject: [PATCH v7 4/5] New TAP test for logical decoding on standby. This test was originally written by Craig Ringer, then extended/modified by me, to test various slot conflict scenarios. Authors: Craig Ringer, Amit Khandekar. --- src/test/perl/PostgresNode.pm | 37 +++ .../t/022_standby_logical_decoding_xmins.pl | 272 ++++++++++++++++++ .../023_standby_logical_decoding_conflicts.pl | 216 ++++++++++++++ 3 files changed, 525 insertions(+) create mode 100644 src/test/recovery/t/022_standby_logical_decoding_xmins.pl create mode 100644 src/test/recovery/t/023_standby_logical_decoding_conflicts.pl diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 9667f7667e..1d62700ca6 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -2223,6 +2223,43 @@ sub pg_recvlogical_upto =pod +=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname) + +Create logical replication slot on given standby + +=cut + +sub create_logical_slot_on_standby +{ + my ($self, $master, $slot_name, $dbname) = @_; + my ($stdout, $stderr); + + my $handle; + + $handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr); + + # Once slot restart_lsn is created, the standby looks for xl_running_xacts + # WAL record from the restart_lsn onwards. So firstly, wait until the slot + # restart_lsn is evaluated. + + $self->poll_query_until( + 'postgres', qq[ + SELECT restart_lsn IS NOT NULL + FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name' + ]) or die "timed out waiting for logical slot to calculate its restart_lsn"; + + # Now arrange for the xl_running_xacts record for which pg_recvlogical + # is waiting. + $master->safe_psql('postgres', 'CHECKPOINT'); + + $handle->finish(); + + is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created') + or die "could not create slot" . $slot_name; +} + +=pod + =back =cut diff --git a/src/test/recovery/t/022_standby_logical_decoding_xmins.pl b/src/test/recovery/t/022_standby_logical_decoding_xmins.pl new file mode 100644 index 0000000000..d654d79526 --- /dev/null +++ b/src/test/recovery/t/022_standby_logical_decoding_xmins.pl @@ -0,0 +1,272 @@ +# logical decoding on a standby : ensure xmins are appropriately updated + +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 23; +use RecursiveCopy; +use File::Copy; +use Time::HiRes qw(usleep); + +my ($stdin, $stdout, $stderr, $ret, $handle, $slot); + +my $node_master = get_new_node('master'); +my $node_standby = get_new_node('standby'); + +# Name for the physical slot on master +my $master_slotname = 'master_physical'; +# Name for the logical slot on standby +my $standby_slotname = 'standby_logical'; + +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for +# given boolean condition to be true to ensure we've reached a quiescent state +sub wait_for_xmins +{ + my ($node, $slotname, $check_expr) = @_; + + $node->poll_query_until( + 'postgres', qq[ + SELECT $check_expr + FROM pg_catalog.pg_replication_slots + WHERE slot_name = '$slotname'; + ]) or die "Timed out waiting for slot xmins to advance"; +} + + +######################## +# Initialize master node +######################## + +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->safe_psql('postgres', qq[SELECT * FROM pg_create_physical_replication_slot('$master_slotname');]); +my $backup_name = 'b1'; +$node_master->backup($backup_name); + +# After slot creation, xmins must be null +$slot = $node_master->slot($master_slotname); +is($slot->{'xmin'}, '', "xmin null"); +is($slot->{'catalog_xmin'}, '', "catalog_xmin null"); + +####################### +# Initialize slave node +####################### + +$node_standby->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_standby->append_conf('postgresql.conf', + qq[primary_slot_name = '$master_slotname']); +$node_standby->start; +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + + +################################ +# xmin/catalog_xmin verification before and after standby-logical-slot creation. +################################ + +# With hot_standby_feedback off, xmin and catalog_xmin must still be null +$slot = $node_master->slot($master_slotname); +is($slot->{'xmin'}, '', "xmin null after standby join"); +is($slot->{'catalog_xmin'}, '', "catalog_xmin null after standby join"); + +$node_standby->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +]); +$node_standby->restart; + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. With hot_standby_feedback on, xmin should advance, +# but catalog_xmin should still remain NULL since there is no logical slot. +wait_for_xmins($node_master, $master_slotname, "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +# Create new slots on the standby, ignoring the ones on the master completely. +# +# This must succeed since we know we have a catalog_xmin reservation. We +# might've already sent hot standby feedback to advance our physical slot's +# catalog_xmin but not received the corresponding xlog for the catalog xmin +# advance, in which case we'll create a slot that isn't usable. The calling +# application can prevent this by creating a temporary slot on the master to +# lock in its catalog_xmin. For a truly race-free solution we'd need +# master-to-standby hot_standby_feedback replies. +# +# In this case it won't race because there's no concurrent activity on the +# master. +# +$node_standby->create_logical_slot_on_standby($node_master, $standby_slotname, 'postgres'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +# Now that slot is created on standby, xmin and catalog_xmin should be non NULL +# on both master and standby. But on master, the xmins will be updated only +# after hot standby feedback is received. +wait_for_xmins($node_master, $master_slotname, "xmin IS NOT NULL AND catalog_xmin IS NOT NULL"); + +$slot = $node_standby->slot($standby_slotname); +is($slot->{'xmin'}, '', "logical xmin null"); +isnt($slot->{'catalog_xmin'}, '', "logical catalog_xmin not null"); + + +################################ +# Standby logical slot should be able to fetch the table changes even when the +# table is dropped. +################################ + +$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)'); +$node_master->safe_psql('postgres', q[INSERT INTO test_table(blah) values ('itworks')]); +$node_master->safe_psql('postgres', 'DROP TABLE test_table'); +$node_master->safe_psql('postgres', 'VACUUM'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +# Should show the inserts even when the table is dropped on master +($ret, $stdout, $stderr) = $node_standby->psql('postgres', qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($stderr, '', 'stderr is empty'); +is($ret, 0, 'replay from slot succeeded') + or die 'cannot continue if slot replay fails'; +is($stdout, q{BEGIN +table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks' +COMMIT}, 'replay results match'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +$slot = $node_master->slot($master_slotname); +isnt($slot->{'xmin'}, '', "physical xmin not null"); +my $saved_physical_catalog_xmin = $slot->{'catalog_xmin'}; +isnt($saved_physical_catalog_xmin, '', "physical catalog_xmin not null"); + +$slot = $node_standby->slot($standby_slotname); +is($slot->{'xmin'}, '', "logical xmin null"); +my $saved_logical_catalog_xmin = $slot->{'catalog_xmin'}; +isnt($saved_logical_catalog_xmin, '', "logical catalog_xmin not null"); + + +################################ +# Catalog xmins should advance after standby logical slot fetches the changes. +################################ + +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, +# we hold down xmin. +$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_1();]); +$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)'); +for my $i (0 .. 2000) +{ + $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); +} +$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_2();]); +$node_master->safe_psql('postgres', 'VACUUM'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +cmp_ok($node_standby->slot($standby_slotname)->{'catalog_xmin'}, "==", + $saved_logical_catalog_xmin, + "logical slot catalog_xmin hasn't advanced before get_changes"); + +($ret, $stdout, $stderr) = $node_standby->psql('postgres', + qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay of big series succeeded'); +isnt($stdout, '', 'replayed some rows'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +# logical slot catalog_xmin on slave should advance after pg_logical_slot_get_changes +wait_for_xmins($node_standby, $standby_slotname, + "catalog_xmin::varchar::int > ${saved_logical_catalog_xmin}"); +$slot = $node_standby->slot($standby_slotname); +my $new_logical_catalog_xmin = $slot->{'catalog_xmin'}; +is($slot->{'xmin'}, '', "logical xmin null"); + +# hot standby feedback should advance master's phys catalog_xmin now that the +# standby's slot doesn't hold it down as far. +# But master's phys catalog_xmin should not go past the slave's logical slot's +# catalog_xmin, even while master's phys xmin advances. +# +# First, make sure master's xmin is advanced. This happens on hot standby +# feedback. So this check for master's xmin advance also makes sure hot standby +# feedback has reached the master, which is required for the subsequent +# catalog_xmin test. +my $temp_phys_xmin = $node_master->slot($master_slotname)->{'xmin'}; +$node_master->safe_psql('postgres', 'SELECT txid_current()'); +wait_for_xmins($node_master, $master_slotname, + "xmin::varchar::int > ${temp_phys_xmin}"); +$slot = $node_master->slot($master_slotname); +# Now check that the master's phys catalog_xmin has advanced but not beyond +# standby's logical catalog_xmin +cmp_ok($slot->{'catalog_xmin'}, ">", $saved_physical_catalog_xmin, + 'upstream physical slot catalog_xmin has advanced with hs_feedback on'); +cmp_ok($slot->{'catalog_xmin'}, "==", $new_logical_catalog_xmin, + 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on'); + + +###################### +# Upstream oldestXid should not go past downstream catalog_xmin +###################### + +# First burn some xids on the master in another DB, so we push the master's +# nextXid ahead. +foreach my $i (1 .. 100) +{ + $node_master->safe_psql('postgres', 'SELECT txid_current()'); +} + +# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance +# past our needed xmin. The only way we have visibility into that is to force +# a checkpoint. +$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); +foreach my $dbname ('template1', 'postgres', 'postgres', 'template0') +{ + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); +} +$node_master->safe_psql('postgres', 'CHECKPOINT'); +IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) + or die "pg_controldata failed with $?"; +my @checkpoint = split('\n', $stdout); +my $oldestXid = ''; +foreach my $line (@checkpoint) +{ + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) + { + $oldestXid = $1; + } +} +die 'no oldestXID found in checkpoint' unless $oldestXid; + +cmp_ok($oldestXid, "<=", $node_standby->slot($standby_slotname)->{'catalog_xmin'}, + 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); + +$node_master->safe_psql('postgres', + "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); + + +################################################## +# Drop slot +# Make sure standby slots are droppable, and properly clear the upstream's xmin +################################################## + +is($node_standby->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on'); + +$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$standby_slotname')]); + +is($node_standby->slot($standby_slotname)->{'slot_type'}, '', 'slot on standby dropped manually'); + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. catalog_xmin should become NULL because we dropped +# the logical slot. +wait_for_xmins($node_master, $master_slotname, + "xmin IS NOT NULL AND catalog_xmin IS NULL"); diff --git a/src/test/recovery/t/023_standby_logical_decoding_conflicts.pl b/src/test/recovery/t/023_standby_logical_decoding_conflicts.pl new file mode 100644 index 0000000000..d0c449338f --- /dev/null +++ b/src/test/recovery/t/023_standby_logical_decoding_conflicts.pl @@ -0,0 +1,216 @@ +# logical decoding on a standby : test conflict recovery; and other tests that +# verify slots get dropped as expected. + +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 24; +use RecursiveCopy; +use File::Copy; +use Time::HiRes qw(usleep); + +my ($stdin, $stdout, $stderr, $ret, $handle, $slot); + +my $node_master = get_new_node('master'); +my $node_standby = get_new_node('standby'); + +# Name for the physical slot on master +my $master_slotname = 'master_physical'; + +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for +# given boolean condition to be true to ensure we've reached a quiescent state +sub wait_for_xmins +{ + my ($node, $slotname, $check_expr) = @_; + + $node->poll_query_until( + 'postgres', qq[ + SELECT $check_expr + FROM pg_catalog.pg_replication_slots + WHERE slot_name = '$slotname'; + ]) or die "Timed out waiting for slot xmins to advance"; +} + +# Create the required logical slots on standby. +sub create_logical_slots +{ + $node_standby->create_logical_slot_on_standby($node_master, 'dropslot', 'testdb'); + $node_standby->create_logical_slot_on_standby($node_master, 'activeslot', 'testdb'); +} + +# Acquire one of the standby logical slots created by create_logical_slots() +sub make_slot_active +{ + my $slot_user_handle; + + # make sure activeslot is in use + print "starting pg_recvlogical\n"; + $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node_standby->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); + + while (!$node_standby->slot('activeslot')->{'active_pid'}) + { + usleep(100_000); + print "waiting for slot to become active\n"; + } + return $slot_user_handle; +} + +# Check if all the slots on standby are dropped. These include the 'activeslot' +# that was acquired by make_slot_active(), and the non-active 'dropslot'. +sub check_slots_dropped +{ + my ($slot_user_handle) = @_; + my $return; + + is($node_standby->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped'); + is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped'); + + # our client should've terminated in response to the walsender error + eval { + $slot_user_handle->finish; + }; + $return = $?; + cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero\n"); + if ($return) { + like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict'); + like($stderr, qr/must be dropped/, 'recvlogical error detail'); + } + + return 0; +} + + +######################## +# Initialize master node +######################## + +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->psql('postgres', q[CREATE DATABASE testdb]); + +$node_master->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$master_slotname');]); +my $backup_name = 'b1'; +$node_master->backup($backup_name); + +####################### +# Initialize slave node +####################### + +$node_standby->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_standby->append_conf('postgresql.conf', + qq[primary_slot_name = '$master_slotname']); +$node_standby->start; +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + + +################################################## +# Recovery conflict: Drop conflicting slots, including in-use slots +# Scenario 1 : hot_standby_feedback off +################################################## + +create_logical_slots(); + +# One way to reproduce recovery conflict is to run VACUUM FULL with +# hot_standby_feedback turned off on slave. +$node_standby->append_conf('postgresql.conf',q[ +hot_standby_feedback = off +]); +$node_standby->restart; +# ensure walreceiver feedback off by waiting for expected xmin and +# catalog_xmin on master. Both should be NULL since hs_feedback is off +wait_for_xmins($node_master, $master_slotname, + "xmin IS NULL AND catalog_xmin IS NULL"); + +$handle = make_slot_active(); + +# This should trigger the conflict +$node_master->safe_psql('testdb', 'VACUUM FULL'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +check_slots_dropped($handle); + +# Turn hot_standby_feedback back on +$node_standby->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_standby->restart; + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. With hot_standby_feedback on, xmin should advance, +# but catalog_xmin should still remain NULL since there is no logical slot. +wait_for_xmins($node_master, $master_slotname, + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +################################################## +# Recovery conflict: Drop conflicting slots, including in-use slots +# Scenario 2 : incorrect wal_level at master +################################################## + +create_logical_slots(); + +$handle = make_slot_active(); + +# Make master wal_level replica. This will trigger slot conflict. +$node_master->append_conf('postgresql.conf',q[ +wal_level = 'replica' +]); +$node_master->restart; + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +check_slots_dropped($handle); + +# Restore master wal_level +$node_master->append_conf('postgresql.conf',q[ +wal_level = 'logical' +]); +$node_master->restart; +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + + +################################################## +# DROP DATABASE should drops it's slots, including active slots. +################################################## + +create_logical_slots(); +$handle = make_slot_active(); + +# Create a slot on a database that would not be dropped. This slot should not +# get dropped. +$node_standby->create_logical_slot_on_standby($node_master, 'otherslot', 'postgres'); + +# dropdb on the master to verify slots are dropped on standby +$node_master->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +is($node_standby->safe_psql('postgres', + q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +check_slots_dropped($handle); + +is($node_standby->slot('otherslot')->{'slot_type'}, 'logical', + 'otherslot on standby not dropped'); + +# Cleanup : manually drop the slot that was not dropped. +$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]); -- 2.25.1
From 6865ce782d8f2416255598b935fded887660a33b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= <fabriziome...@gmail.com> Date: Mon, 14 Dec 2020 13:47:41 -0300 Subject: [PATCH v7 5/5] Doc changes describing details about logical decoding --- doc/src/sgml/logicaldecoding.sgml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 813a037fac..003520dc02 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -248,6 +248,24 @@ $ pg_recvlogical -d postgres --slot=test --drop-slot may consume changes from a slot at any given time. </para> + <para> + A logical replication slot can also be created on a hot standby. To prevent + <command>VACUUM</command> from removing required rows from the system + catalogs, <varname>hot_standby_feedback</varname> should be set on the + standby. In spite of that, if any required rows get removed, the slot gets + dropped. Existing logical slots on standby also get dropped if wal_level + on primary is reduced to less than 'logical'. + </para> + + <para> + For a logical slot to be created, it builds a historic snapshot, for which + information of all the currently running transactions is essential. On + primary, this information is available, but on standby, this information + has to be obtained from primary. So, slot creation may wait for some + activity to happen on the primary. If the primary is idle, creating a + logical slot on standby may take a noticeable time. + </para> + <caution> <para> Replication slots persist across crashes and know nothing about the state -- 2.25.1
From 873fd297349c309c26245afcd09e92ba09dd76fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= <fabriziome...@gmail.com> Date: Mon, 14 Dec 2020 11:52:02 -0300 Subject: [PATCH v7 3/5] Handle logical slot conflicts on standby. During WAL replay on standby, when slot conflict is identified, drop such slots. Also do the same thing if wal_level on master is reduced to below logical and there are existing logical slots on standby. Introduce a new ProcSignalReason value for slot conflict recovery. Arrange for a new pg_stat_get_activity field: confl_logicalslot. Amit Khandekar, reviewed by Andres Freund. --- doc/src/sgml/monitoring.sgml | 10 ++ src/backend/access/gist/gistxlog.c | 4 +- src/backend/access/hash/hash_xlog.c | 3 +- src/backend/access/heap/heapam.c | 13 +- src/backend/access/nbtree/nbtxlog.c | 4 +- src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 14 ++ src/backend/catalog/system_views.sql | 1 + src/backend/postmaster/pgstat.c | 4 + src/backend/replication/slot.c | 188 +++++++++++++++++++++++++++ src/backend/storage/ipc/procarray.c | 4 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 4 +- src/backend/tcop/postgres.c | 22 ++++ src/backend/utils/adt/pgstatfuncs.c | 16 +++ src/include/catalog/pg_proc.dat | 5 + src/include/pgstat.h | 1 + src/include/replication/slot.h | 2 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 2 +- src/test/regress/expected/rules.out | 1 + 21 files changed, 294 insertions(+), 9 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 52a69a5366..788f17c3a4 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -3842,6 +3842,16 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i deadlocks </para></entry> </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>confl_logicalslot</structfield> <type>bigint</type> + </para> + <para> + Number of queries in this database that have been canceled due to + logical slots + </para></entry> + </row> </tbody> </tgroup> </table> diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 6580c98cb8..5c4a3cdc5b 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) @@ -413,6 +414,7 @@ gistRedoPageReuse(XLogReaderState *record) latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid); ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); } } diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index 3c60677662..5767890fa4 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) RelFileNode rnode; XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 65bce76277..46c79def07 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7702,7 +7702,8 @@ heap_xlog_cleanup_info(XLogReaderState *record) xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record); if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); /* * Actual operation is a no-op. Record type exists to provide a means for @@ -7738,7 +7739,8 @@ heap_xlog_clean(XLogReaderState *record) * latestRemovedXid is invalid, skip conflict processing. */ if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid)) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); /* * If we have a full-page image, restore it (using a cleanup lock) and @@ -7834,7 +7836,9 @@ heap_xlog_visible(XLogReaderState *record) * rather than killing the transaction outright. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, + xlrec->onCatalogTable, + rnode); /* * Read the heap page, if it still exists. If the heap file has dropped or @@ -7971,7 +7975,8 @@ heap_xlog_freeze_page(XLogReaderState *record) TransactionIdRetreat(latestRemovedXid); XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 5135b800af..46591d54fa 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -662,7 +662,8 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); } /* @@ -959,6 +960,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) { ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); } } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 999d0ca15d..3782b3a69a 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + xldata->onCatalogTable, node); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 609edbaca6..2c285a6964 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -10255,6 +10255,20 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Drop logical slots if we are in hot standby and the primary does not + * have a WAL level sufficient for logical decoding. No need to search + * for potentially conflicting logically slots if standby is running + * with wal_level lower than logical, because in that case, we would + * have either disallowed creation of logical slots or dropped existing + * ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId, + gettext_noop("Logical decoding on standby requires wal_level >= logical on master.")); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b140c210bc..b67d0f6ef9 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -938,6 +938,7 @@ CREATE VIEW pg_stat_database_conflicts AS pg_stat_get_db_conflict_tablespace(D.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(D.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot, + pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_logicalslot, pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock FROM pg_database D; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 6b60f293e9..b52755084d 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4978,6 +4978,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) dbentry->n_conflict_tablespace = 0; dbentry->n_conflict_lock = 0; dbentry->n_conflict_snapshot = 0; + dbentry->n_conflict_logicalslot = 0; dbentry->n_conflict_bufferpin = 0; dbentry->n_conflict_startup_deadlock = 0; dbentry->n_temp_files = 0; @@ -6846,6 +6847,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: dbentry->n_conflict_snapshot++; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + dbentry->n_conflict_logicalslot++; + break; case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: dbentry->n_conflict_bufferpin++; break; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 8a0f51907b..faceab0fa3 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,6 +46,7 @@ #include "pgstat.h" #include "replication/slot.h" #include "storage/fd.h" +#include "storage/lock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -103,6 +104,7 @@ static int ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, SlotAcquireBehavior behavior); static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static void ReplicationSlotDropConflicting(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -711,6 +713,64 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) LWLockRelease(ReplicationSlotAllocationLock); } +/* + * Permanently drop a conflicting replication slot. If it's already active by + * another backend, send it a recovery conflict signal, and then try again. + */ +static void +ReplicationSlotDropConflicting(ReplicationSlot *slot) +{ + pid_t active_pid; + PGPROC *proc; + VirtualTransactionId vxid; + + ConditionVariablePrepareToSleep(&slot->active_cv); + while (1) + { + SpinLockAcquire(&slot->mutex); + active_pid = slot->active_pid; + if (active_pid == 0) + active_pid = slot->active_pid = MyProcPid; + SpinLockRelease(&slot->mutex); + + /* Drop the acquired slot, unless it is acquired by another backend */ + if (active_pid == MyProcPid) + { + elog(DEBUG1, "acquired conflicting slot, now dropping it"); + ReplicationSlotDropPtr(slot); + break; + } + + /* Send the other backend, a conflict recovery signal */ + + SetInvalidVirtualTransactionId(vxid); + LWLockAcquire(ProcArrayLock, LW_SHARED); + proc = BackendPidGetProcWithLock(active_pid); + if (proc) + GET_VXID_FROM_PGPROC(vxid, *proc); + LWLockRelease(ProcArrayLock); + + /* + * If coincidently that process finished, some other backend may + * acquire the slot again. So start over again. + * Note: Even if vxid.localTransactionId is invalid, we need to cancel + * that backend, because there is no other way to make it release the + * slot. So don't bother to validate vxid.localTransactionId. + */ + if (vxid.backendId == InvalidBackendId) + continue; + + elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot", + active_pid, vxid.backendId); + + CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + ConditionVariableSleep(&slot->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + } + + ConditionVariableCancelSleep(); +} + /* * Serialize the currently acquired slot's state from memory to disk, thereby * guaranteeing the current state will survive a crash. @@ -1141,12 +1201,25 @@ ReplicationSlotReserveWal(void) { XLogRecPtr flushptr; + /* start at current insert position */ + restart_lsn = GetXLogInsertRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); /* and make sure it's fsynced to disk */ XLogFlush(flushptr); } + else + { + restart_lsn = GetRedoRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + } /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); @@ -1164,6 +1237,121 @@ ReplicationSlotReserveWal(void) } } +/* + * Resolve recovery conflicts with logical slots. + * + * When xid is valid, it means that rows older than xid might have been + * removed. Therefore we need to drop slots that depend on seeing those rows. + * When xid is invalid, drop all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be dropped. Also, when xid is invalid, a common 'conflict_reason' is + * provided for the error detail; otherwise it is NULL, in which case it is + * constructed out of the xid value. + */ +void +ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, + char *conflict_reason) +{ + int i; + bool found_conflict = false; + + if (max_replication_slots <= 0) + return; + +restart: + if (found_conflict) + { + CHECK_FOR_INTERRUPTS(); + found_conflict = false; + } + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* We are only dealing with *logical* slot conflicts. */ + if (!SlotIsLogical(s)) + continue; + + /* Invalid xid means caller is asking to drop all logical slots */ + if (!TransactionIdIsValid(xid)) + found_conflict = true; + else + { + TransactionId slot_xmin; + TransactionId slot_catalog_xmin; + StringInfoData conflict_str, conflict_xmins; + char *conflict_sentence = + gettext_noop("Slot conflicted with xid horizon which was being increased to"); + + /* not our database, skip */ + if (s->data.database != InvalidOid && s->data.database != dboid) + continue; + + SpinLockAcquire(&s->mutex); + slot_xmin = s->data.xmin; + slot_catalog_xmin = s->data.catalog_xmin; + SpinLockRelease(&s->mutex); + + /* + * Build the conflict_str which will look like : + * "Slot conflicted with xid horizon which was being increased + * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)." + */ + initStringInfo(&conflict_xmins); + if (TransactionIdIsValid(slot_xmin) && + TransactionIdPrecedesOrEquals(slot_xmin, xid)) + { + appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin); + } + if (TransactionIdIsValid(slot_catalog_xmin) && + TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)) + appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d", + conflict_xmins.len > 0 ? ", " : "", + slot_catalog_xmin); + + if (conflict_xmins.len > 0) + { + initStringInfo(&conflict_str); + appendStringInfo(&conflict_str, "%s %d (%s).", + conflict_sentence, xid, conflict_xmins.data); + found_conflict = true; + conflict_reason = conflict_str.data; + } + } + + if (found_conflict) + { + NameData slotname; + + SpinLockAcquire(&s->mutex); + slotname = s->data.name; + SpinLockRelease(&s->mutex); + + /* ReplicationSlotDropConflicting() will acquire the lock below */ + LWLockRelease(ReplicationSlotControlLock); + + ReplicationSlotDropConflicting(s); + + ereport(LOG, + (errmsg("dropped conflicting slot %s", NameStr(slotname)), + errdetail("%s", conflict_reason))); + + /* We released the lock above; so re-scan the slots. */ + goto restart; + } + } + + LWLockRelease(ReplicationSlotControlLock); +} + /* * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index ee912b9d5e..18c23927d5 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -3339,6 +3339,10 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) GET_VXID_FROM_PGPROC(procvxid, *proc); + /* + * Note: vxid.localTransactionId can be invalid, which means the + * request is to signal the pid that is not running a transaction. + */ if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId) { diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index ffe67acea1..8047bcce63 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -579,6 +579,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 4ea3cf1f5c..a66bfcebf1 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -23,6 +23,7 @@ #include "access/xloginsert.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -300,7 +301,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, } void -ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node) +ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, + bool onCatalogTable, RelFileNode node) { VirtualTransactionId *backends; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 3679799e50..649f419903 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2465,6 +2465,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -2933,6 +2936,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_LOCK: case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + /* + * For conflicts that require a logical slot to be dropped, the + * requirement is for the signal receiver to release the slot, + * so that it could be dropped by the signal sender. So for + * normal backends, the transaction should be aborted, just + * like for other recovery conflicts. But if it's walsender on + * standby, then it has to be killed so as to release an + * acquired logical slot. + */ + if (am_cascading_walsender && + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) + { + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + break; + } /* * If we aren't in a transaction any longer then ignore. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 6afe1b6f56..7a2da6780b 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1494,6 +1494,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS) PG_RETURN_INT64(result); } +Datum +pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + int64 result; + PgStat_StatDBEntry *dbentry; + + if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL) + result = 0; + else + result = (int64) (dbentry->n_conflict_logicalslot); + + PG_RETURN_INT64(result); +} + Datum pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS) { @@ -1537,6 +1552,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS) result = (int64) (dbentry->n_conflict_tablespace + dbentry->n_conflict_lock + dbentry->n_conflict_snapshot + + dbentry->n_conflict_logicalslot + dbentry->n_conflict_bufferpin + dbentry->n_conflict_startup_deadlock); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index e6c7b070f6..bd79bc2b20 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5389,6 +5389,11 @@ proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's', proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', prosrc => 'pg_stat_get_db_conflict_snapshot' }, +{ oid => '3585', + descr => 'statistics: recovery conflicts in database caused by logical replication slot', + proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's', + proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', + prosrc => 'pg_stat_get_db_conflict_logicalslot' }, { oid => '3068', descr => 'statistics: recovery conflicts in database caused by shared buffer pin', proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 5954068dec..b6627a7e76 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -687,6 +687,7 @@ typedef struct PgStat_StatDBEntry PgStat_Counter n_conflict_tablespace; PgStat_Counter n_conflict_lock; PgStat_Counter n_conflict_snapshot; + PgStat_Counter n_conflict_logicalslot; PgStat_Counter n_conflict_bufferpin; PgStat_Counter n_conflict_startup_deadlock; PgStat_Counter n_temp_files; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 63bab6967f..90027da054 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -217,4 +217,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); +extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason); + #endif /* SLOT_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 5cb39697f3..ffcf9e2d49 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -40,6 +40,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index faaf1d3817..bc3471777c 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, - RelFileNode node); + bool onCatalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6293ab57bc..9d5bd887bf 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1860,6 +1860,7 @@ pg_stat_database_conflicts| SELECT d.oid AS datid, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, + pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_logicalslot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d; -- 2.25.1