On Wed, Mar 18, 2020 at 4:50 PM Alvaro Herrera <[email protected]>
wrote:
>
>
> well, not "forever", but:
>
> $ make check PROVE_TESTS=t/019_standby_logical_decoding_conflicts.pl
PROVE_FLAGS=-v
> ...
> cd /pgsql/source/master/src/test/recovery &&
TESTDIR='/home/alvherre/mnt/crypt/alvherre/Code/pgsql/build/master/src/test/recovery'
PATH="/pgsql/build/master/tmp_install/pgsql/install/master/bin:$PATH"
LD_LIBRARY_PATH="/pgsql/build/master/tmp_install/pgsql/install/master/lib"
PGPORT='655432'
PG_REGRESS='/home/alvherre/mnt/crypt/alvherre/Code/pgsql/build/master/src/test/recovery/../../../src/test/regress/pg_regress'
REGRESS_SHLIB='/pgsql/build/master/src/test/regress/regress.so'
/usr/bin/prove -I /pgsql/source/master/src/test/perl/ -I
/pgsql/source/master/src/test/recovery -v t/
019_standby_logical_decoding_conflicts.pl
> t/019_standby_logical_decoding_conflicts.pl ..
> 1..24
> ok 1 - dropslot on standby created
> ok 2 - activeslot on standby created
> # poll_query_until timed out executing this query:
> # SELECT '0/35C9190' <= replay_lsn AND state = 'streaming' FROM
pg_catalog.pg_stat_replication WHERE application_name = 'standby';
> # expecting this output:
> # t
> # last actual query output:
> #
> # with stderr:
> Bailout called. Further testing stopped: system pg_ctl failed
> Bail out! system pg_ctl failed
> FAILED--Further testing stopped: system pg_ctl failed
> make: *** [Makefile:19: check] Error 255
>
After rebase and did minimal tweaks (duplicated oid, TAP tests numbering)
I'm facing similar problem but in other place:
make -C src/test/recovery check PROVE_TESTS=t/
023_standby_logical_decoding_conflicts.pl PROVE_FLAGS=-v
...
/usr/bin/mkdir -p '/data/src/pg/main/src/test/recovery'/tmp_check
cd . && TESTDIR='/data/src/pg/main/src/test/recovery'
PATH="/d/src/pg/main/tmp_install/home/fabrizio/pgsql/logical-decoding-standby/bin:$PATH"
LD_LIBRARY_PATH="/d/src/pg/main/tmp_install/home/fabrizio/pgsql/logical-decoding-standby/lib"
PGPORT='65432'
PG_REGRESS='/data/src/pg/main/src/test/recovery/../../../src/test/regress/pg_regress'
REGRESS_SHLIB='/d/src/pg/main/src/test/regress/regress.so' /usr/bin/prove
-I ../../../src/test/perl/ -I . -v t/
023_standby_logical_decoding_conflicts.pl
t/023_standby_logical_decoding_conflicts.pl ..
1..24
ok 1 - dropslot on standby created
ok 2 - activeslot on standby created
not ok 3 - dropslot on standby dropped
# Failed test 'dropslot on standby dropped'
# at t/023_standby_logical_decoding_conflicts.pl line 67.
# got: 'logical'
# expected: ''
not ok 4 - activeslot on standby dropped
# Failed test 'activeslot on standby dropped'
# at t/023_standby_logical_decoding_conflicts.pl line 68.
# got: 'logical'
# expected: ''
TAP tests hang forever in `check_slots_dropped` exactly here:
# our client should've terminated in response to the walsender error
eval {
$slot_user_handle->finish;
};
Regards,
--
Fabrízio de Royes Mello
PostgreSQL Developer at OnGres Inc. - https://ongres.com
From a3e42f2fc53afd2bcbe6da9d029d91694a34562e Mon Sep 17 00:00:00 2001
From: Amit Khandekar <[email protected]>
Date: Thu, 16 Jan 2020 10:05:15 +0530
Subject: [PATCH v7 1/5] Allow logical decoding on standby.
Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary. Conflicting slots
would be handled in next commits.
Andres Freund and Amit Khandekar.
---
src/backend/access/transam/xlog.c | 11 +++++
src/backend/replication/logical/decode.c | 22 ++++++++-
src/backend/replication/logical/logical.c | 37 ++++++++-------
src/backend/replication/slot.c | 57 +++++++++++++++--------
src/backend/replication/walsender.c | 10 ++--
src/include/access/xlog.h | 1 +
6 files changed, 98 insertions(+), 40 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8dd225c2e1..609edbaca6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5038,6 +5038,17 @@ LocalProcessControlFile(bool reset)
ReadControlFile();
}
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevel(void)
+{
+ return ControlFile->wal_level;
+}
+
/*
* Initialization of shared memory for XLOG
*/
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3f84ee99b8..bb7c80d6cc 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -203,11 +203,31 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* can restart from there.
*/
break;
+ case XLOG_PARAMETER_CHANGE:
+ {
+ xl_parameter_change *xlrec =
+ (xl_parameter_change *) XLogRecGetData(buf->record);
+
+ /*
+ * If wal_level on primary is reduced to less than logical, then we
+ * want to prevent existing logical slots from being used.
+ * Existing logical slots on standby get dropped when this WAL
+ * record is replayed; and further, slot creation fails when the
+ * wal level is not sufficient; but all these operations are not
+ * synchronized, so a logical slot may creep in while the wal_level
+ * is being reduced. Hence this extra check.
+ */
+ if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on standby requires "
+ "wal_level >= logical on master")));
+ break;
+ }
case XLOG_NOOP:
case XLOG_NEXTOID:
case XLOG_SWITCH:
case XLOG_BACKUP_END:
- case XLOG_PARAMETER_CHANGE:
case XLOG_RESTORE_POINT:
case XLOG_FPW_CHANGE:
case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f1f4df7d70..ec4f7b95e4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -110,23 +110,22 @@ CheckLogicalDecodingRequirements(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires a database connection")));
- /* ----
- * TODO: We got to change that someday soon...
- *
- * There's basically three things missing to allow this:
- * 1) We need to be able to correctly and quickly identify the timeline a
- * LSN belongs to
- * 2) We need to force hot_standby_feedback to be enabled at all times so
- * the primary cannot remove rows we need.
- * 3) support dropping replication slots referring to a database, in
- * dbase_redo. There can't be any active ones due to HS recovery
- * conflicts, so that should be relatively easy.
- * ----
- */
if (RecoveryInProgress())
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("logical decoding cannot be used while in recovery")));
+ {
+ /*
+ * This check may have race conditions, but whenever
+ * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+ * verify that there are no existing logical replication slots. And to
+ * avoid races around creating a new slot,
+ * CheckLogicalDecodingRequirements() is called once before creating
+ * the slot, and once when logical decoding is initially starting up.
+ */
+ if (GetActiveWalLevel() < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on standby requires "
+ "wal_level >= logical on master")));
+ }
}
/*
@@ -295,6 +294,12 @@ CreateInitDecodingContext(const char *plugin,
LogicalDecodingContext *ctx;
MemoryContext old_context;
+ /*
+ * On standby, this check is also required while creating the slot. Check
+ * the comments in this function.
+ */
+ CheckLogicalDecodingRequirements();
+
/* shorter lines... */
slot = MyReplicationSlot;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 9c7cf13d4d..8a0f51907b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1097,37 +1097,56 @@ ReplicationSlotReserveWal(void)
/*
* For logical slots log a standby snapshot and start logical decoding
* at exactly that position. That allows the slot to start up more
- * quickly.
+ * quickly. But on a standby we cannot do WAL writes, so just use the
+ * replay pointer; effectively, an attempt to create a logical slot on
+ * standby will cause it to wait for an xl_running_xact record to be
+ * logged independently on the primary, so that a snapshot can be built
+ * using the record.
*
- * That's not needed (or indeed helpful) for physical slots as they'll
- * start replay at the last logged checkpoint anyway. Instead return
- * the location of the last redo LSN. While that slightly increases
- * the chance that we have to retry, it's where a base backup has to
- * start replay at.
+ * None of this is needed (or indeed helpful) for physical slots as
+ * they'll start replay at the last logged checkpoint anyway. Instead
+ * return the location of the last redo LSN. While that slightly
+ * increases the chance that we have to retry, it's where a base backup
+ * has to start replay at.
*/
+ if (SlotIsPhysical(slot))
+ restart_lsn = GetRedoRecPtr();
+ else if (RecoveryInProgress())
+ {
+ restart_lsn = GetXLogReplayRecPtr(NULL);
+ /*
+ * Replay pointer may point one past the end of the record. If that
+ * is a XLOG page boundary, it will not be a valid LSN for the
+ * start of a record, so bump it up past the page header.
+ */
+ if (!XRecOffIsValid(restart_lsn))
+ {
+ if (restart_lsn % XLOG_BLCKSZ != 0)
+ elog(ERROR, "invalid replay pointer");
+ /* For the first page of a segment file, it's a long header */
+ if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0)
+ restart_lsn += SizeOfXLogLongPHD;
+ else
+ restart_lsn += SizeOfXLogShortPHD;
+ }
+ }
+ else
+ restart_lsn = GetXLogInsertRecPtr();
+
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
+
if (!RecoveryInProgress() && SlotIsLogical(slot))
{
XLogRecPtr flushptr;
- /* start at current insert position */
- restart_lsn = GetXLogInsertRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
-
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
/* and make sure it's fsynced to disk */
XLogFlush(flushptr);
}
- else
- {
- restart_lsn = GetRedoRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
- }
/* prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d5c9bc31d8..d8308d7d27 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2878,10 +2878,12 @@ XLogSendLogical(void)
* If first time through in this session, initialize flushPtr. Otherwise,
* we only need to update flushPtr if EndRecPtr is past it.
*/
- if (flushPtr == InvalidXLogRecPtr)
- flushPtr = GetFlushRecPtr();
- else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
- flushPtr = GetFlushRecPtr();
+ if (flushPtr == InvalidXLogRecPtr ||
+ logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+ {
+ flushPtr = (am_cascading_walsender ?
+ GetStandbyFlushRecPtr() : GetFlushRecPtr());
+ }
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 221af87e71..ddc167d2c9 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -324,6 +324,7 @@ extern Size XLOGShmemSize(void);
extern void XLOGShmemInit(void);
extern void BootStrapXLOG(void);
extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevel(void);
extern void StartupXLOG(void);
extern void ShutdownXLOG(int code, Datum arg);
extern void InitXLOGAccess(void);
--
2.25.1
From 91a987c32179a692e8af7be1646746a93d2f8f56 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <[email protected]>
Date: Thu, 16 Jan 2020 10:05:16 +0530
Subject: [PATCH v7 2/5] Add info in WAL records in preparation for logical
slot conflict handling.
When a WAL replay on standby indicates that a catalog table tuple is
to be deleted by an xid that is greater than a logical slot's
catalog_xmin, then that means the slot's catalog_xmin conflicts with
the xid, and we need to handle the conflict. While subsequent commits
will do the actual conflict handling, this commit adds a new field
onCatalogTable in such WAL records, that is true for catalog tables,
so as to arrange for conflict handling.
Andres Freund.
---
src/backend/access/gist/gist.c | 2 +-
src/backend/access/gist/gistbuild.c | 2 +-
src/backend/access/gist/gistutil.c | 4 ++--
src/backend/access/gist/gistxlog.c | 4 +++-
src/backend/access/hash/hashinsert.c | 2 ++
src/backend/access/heap/heapam.c | 10 +++++++---
src/backend/access/heap/vacuumlazy.c | 2 +-
src/backend/access/heap/visibilitymap.c | 2 +-
src/backend/access/nbtree/nbtpage.c | 4 ++++
src/backend/access/spgist/spgvacuum.c | 8 ++++++++
src/backend/utils/cache/lsyscache.c | 16 ++++++++++++++++
src/include/access/gist_private.h | 6 +++---
src/include/access/gistxlog.h | 3 ++-
src/include/access/hash_xlog.h | 1 +
src/include/access/heapam_xlog.h | 8 ++++++--
src/include/access/nbtxlog.h | 2 ++
src/include/access/spgxlog.h | 1 +
src/include/utils/lsyscache.h | 1 +
src/include/utils/rel.h | 9 +++++++++
19 files changed, 71 insertions(+), 16 deletions(-)
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 3f2b416ce1..a71d914384 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -344,7 +344,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
for (; ptr; ptr = ptr->next)
{
/* Allocate new page */
- ptr->buffer = gistNewBuffer(rel);
+ ptr->buffer = gistNewBuffer(heapRel, rel);
GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
ptr->page = BufferGetPage(ptr->buffer);
ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index 9d3fa9c3b7..c6f0153d7b 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -290,7 +290,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
Page page;
/* initialize the root page */
- buffer = gistNewBuffer(index);
+ buffer = gistNewBuffer(heap, index);
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
page = BufferGetPage(buffer);
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index 615b5ade23..6169d6b294 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -823,7 +823,7 @@ gistcheckpage(Relation rel, Buffer buf)
* Caller is responsible for initializing the page by calling GISTInitBuffer
*/
Buffer
-gistNewBuffer(Relation r)
+gistNewBuffer(Relation heapRel, Relation r)
{
Buffer buffer;
bool needLock;
@@ -867,7 +867,7 @@ gistNewBuffer(Relation r)
* page's deleteXid.
*/
if (XLogStandbyInfoActive() && RelationNeedsWAL(r))
- gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page));
+ gistXLogPageReuse(heapRel, r, blkno, GistPageGetDeleteXid(page));
return buffer;
}
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 91b3e11182..6580c98cb8 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -616,7 +616,8 @@ gistXLogAssignLSN(void)
* Write XLOG record about reuse of a deleted page.
*/
void
-gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemovedXid)
+gistXLogPageReuse(Relation heapRel, Relation rel,
+ BlockNumber blkno, FullTransactionId latestRemovedXid)
{
gistxlogPageReuse xlrec_reuse;
@@ -627,6 +628,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemov
*/
/* XLOG stuff */
+ xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(heapRel);
xlrec_reuse.node = rel->rd_node;
xlrec_reuse.block = blkno;
xlrec_reuse.latestRemovedFullXid = latestRemovedXid;
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index 2ebe671967..e6cef08330 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -17,6 +17,7 @@
#include "access/hash.h"
#include "access/hash_xlog.h"
+#include "catalog/catalog.h"
#include "miscadmin.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
@@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
xl_hash_vacuum_one_page xlrec;
XLogRecPtr recptr;
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel);
xlrec.latestRemovedXid = latestRemovedXid;
xlrec.ntuples = ndeletable;
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a9583f3103..65bce76277 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7144,12 +7144,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel,
* see comments for vacuum_log_cleanup_info().
*/
XLogRecPtr
-log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid)
{
xl_heap_cleanup_info xlrec;
XLogRecPtr recptr;
- xlrec.node = rnode;
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
+ xlrec.node = rel->rd_node;
xlrec.latestRemovedXid = latestRemovedXid;
XLogBeginInsert();
@@ -7185,6 +7186,7 @@ log_heap_clean(Relation reln, Buffer buffer,
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
xlrec.latestRemovedXid = latestRemovedXid;
xlrec.nredirected = nredirected;
xlrec.ndead = ndead;
@@ -7235,6 +7237,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
/* nor when there are no tuples to freeze */
Assert(ntuples > 0);
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
xlrec.cutoff_xid = cutoff_xid;
xlrec.ntuples = ntuples;
@@ -7265,7 +7268,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
* heap_buffer, if necessary.
*/
XLogRecPtr
-log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
TransactionId cutoff_xid, uint8 vmflags)
{
xl_heap_visible xlrec;
@@ -7275,6 +7278,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
Assert(BufferIsValid(heap_buffer));
Assert(BufferIsValid(vm_buffer));
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
xlrec.cutoff_xid = cutoff_xid;
xlrec.flags = vmflags;
XLogBeginInsert();
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 25f2d5df1b..4829ed9293 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -718,7 +718,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
* No need to write the record at all unless it contains a valid value
*/
if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
- (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+ (void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid);
}
/*
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index b1072183bc..9819977688 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -282,7 +282,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
if (XLogRecPtrIsInvalid(recptr))
{
Assert(!InRecovery);
- recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
+ recptr = log_heap_visible(rel, heapBuf, vmBuf,
cutoff_xid, flags);
/*
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 793434c026..921aa75ac6 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -32,6 +32,7 @@
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
+#include "utils/lsyscache.h"
#include "utils/memdebug.h"
#include "utils/snapmgr.h"
@@ -768,6 +769,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX
*/
/* XLOG stuff */
+ xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
xlrec_reuse.node = rel->rd_node;
xlrec_reuse.block = blkno;
xlrec_reuse.latestRemovedXid = latestRemovedXid;
@@ -1323,6 +1325,8 @@ _bt_delitems_delete(Relation rel, Buffer buf,
XLogRecPtr recptr;
xl_btree_delete xlrec_delete;
+ xlrec_delete.onCatalogTable =
+ RelationIsAccessibleInLogicalDecoding(heapRel);
xlrec_delete.latestRemovedXid = latestRemovedXid;
xlrec_delete.ndeleted = ndeletable;
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index e1c58933f9..bb771b2fcf 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -27,6 +27,7 @@
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
/* Entry in pending-list of TIDs we need to revisit */
@@ -503,6 +504,13 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
spgxlogVacuumRedirect xlrec;
GlobalVisState *vistest;
+ /*
+ * There is no chance of endless recursion even when we are doing catalog
+ * acceses here; because, spgist is never used for catalogs. Check
+ * comments in RelationIsAccessibleInLogicalDecoding().
+ */
+ xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
+
xlrec.nToPlaceholder = 0;
xlrec.newestRedirectXid = InvalidTransactionId;
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 204bcd03c0..92ce304447 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -18,7 +18,9 @@
#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
+#include "access/table.h"
#include "bootstrap/bootstrap.h"
+#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_am.h"
#include "catalog/pg_amop.h"
@@ -2035,6 +2037,20 @@ get_rel_persistence(Oid relid)
return result;
}
+bool
+get_rel_logical_catalog(Oid relid)
+{
+ bool res;
+ Relation rel;
+
+ /* assume previously locked */
+ rel = table_open(relid, NoLock);
+ res = RelationIsAccessibleInLogicalDecoding(rel);
+ table_close(rel, NoLock);
+
+ return res;
+}
+
/* ---------- TRANSFORM CACHE ---------- */
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index b68c01a5f2..e2ba53ef74 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -439,8 +439,8 @@ extern XLogRecPtr gistXLogPageDelete(Buffer buffer,
FullTransactionId xid, Buffer parentBuffer,
OffsetNumber downlinkOffset);
-extern void gistXLogPageReuse(Relation rel, BlockNumber blkno,
- FullTransactionId latestRemovedXid);
+extern void gistXLogPageReuse(Relation heapRel, Relation rel,
+ BlockNumber blkno, FullTransactionId latestRemovedXid);
extern XLogRecPtr gistXLogUpdate(Buffer buffer,
OffsetNumber *todelete, int ntodelete,
@@ -484,7 +484,7 @@ extern bool gistproperty(Oid index_oid, int attno,
extern bool gistfitpage(IndexTuple *itvec, int len);
extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
extern void gistcheckpage(Relation rel, Buffer buf);
-extern Buffer gistNewBuffer(Relation r);
+extern Buffer gistNewBuffer(Relation heapRel, Relation r);
extern bool gistPageRecyclable(Page page);
extern void gistfillbuffer(Page page, IndexTuple *itup, int len,
OffsetNumber off);
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 673afee1e1..723867bf15 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -49,9 +49,9 @@ typedef struct gistxlogPageUpdate
*/
typedef struct gistxlogDelete
{
+ bool onCatalogTable;
TransactionId latestRemovedXid;
uint16 ntodelete; /* number of deleted offsets */
-
/*
* In payload of blk 0 : todelete OffsetNumbers
*/
@@ -97,6 +97,7 @@ typedef struct gistxlogPageDelete
*/
typedef struct gistxlogPageReuse
{
+ bool onCatalogTable;
RelFileNode node;
BlockNumber block;
FullTransactionId latestRemovedFullXid;
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index d1aa6daa40..b44892cc48 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -250,6 +250,7 @@ typedef struct xl_hash_init_bitmap_page
*/
typedef struct xl_hash_vacuum_one_page
{
+ bool onCatalogTable;
TransactionId latestRemovedXid;
int ntuples;
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 1525194112..0f111b733a 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -236,6 +236,7 @@ typedef struct xl_heap_update
*/
typedef struct xl_heap_clean
{
+ bool onCatalogTable;
TransactionId latestRemovedXid;
uint16 nredirected;
uint16 ndead;
@@ -251,6 +252,7 @@ typedef struct xl_heap_clean
*/
typedef struct xl_heap_cleanup_info
{
+ bool onCatalogTable;
RelFileNode node;
TransactionId latestRemovedXid;
} xl_heap_cleanup_info;
@@ -331,6 +333,7 @@ typedef struct xl_heap_freeze_tuple
*/
typedef struct xl_heap_freeze_page
{
+ bool onCatalogTable;
TransactionId cutoff_xid;
uint16 ntuples;
} xl_heap_freeze_page;
@@ -345,6 +348,7 @@ typedef struct xl_heap_freeze_page
*/
typedef struct xl_heap_visible
{
+ bool onCatalogTable;
TransactionId cutoff_xid;
uint8 flags;
} xl_heap_visible;
@@ -394,7 +398,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
extern const char *heap2_identify(uint8 info);
extern void heap_xlog_logical_rewrite(XLogReaderState *r);
-extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
+extern XLogRecPtr log_heap_cleanup_info(Relation rel,
TransactionId latestRemovedXid);
extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
OffsetNumber *redirected, int nredirected,
@@ -413,7 +417,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
bool *totally_frozen);
extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
xl_heap_freeze_tuple *xlrec_tp);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags);
#endif /* HEAPAM_XLOG_H */
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 5c014bdc66..f5c60ff82f 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -186,6 +186,7 @@ typedef struct xl_btree_dedup
*/
typedef struct xl_btree_delete
{
+ bool onCatalogTable;
TransactionId latestRemovedXid;
uint32 ndeleted;
@@ -203,6 +204,7 @@ typedef struct xl_btree_delete
*/
typedef struct xl_btree_reuse_page
{
+ bool onCatalogTable;
RelFileNode node;
BlockNumber block;
TransactionId latestRemovedXid;
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index 63d3c63db2..83121c7266 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot
typedef struct spgxlogVacuumRedirect
{
+ bool onCatalogTable;
uint16 nToPlaceholder; /* number of redirects to make placeholders */
OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */
TransactionId newestRedirectXid; /* newest XID of removed redirects */
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index a990d11ea8..f7816542a4 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -138,6 +138,7 @@ extern char get_rel_relkind(Oid relid);
extern bool get_rel_relispartition(Oid relid);
extern Oid get_rel_tablespace(Oid relid);
extern char get_rel_persistence(Oid relid);
+extern bool get_rel_logical_catalog(Oid relid);
extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes);
extern bool get_typisdefined(Oid typid);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index c5ffea40f2..6f5ffbfd27 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -16,6 +16,7 @@
#include "access/tupdesc.h"
#include "access/xlog.h"
+#include "catalog/catalog.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h"
#include "catalog/pg_publication.h"
@@ -345,6 +346,9 @@ typedef struct StdRdOptions
* RelationIsUsedAsCatalogTable
* Returns whether the relation should be treated as a catalog table
* from the pov of logical decoding. Note multiple eval of argument!
+ * This definition should not invoke anything that performs catalog
+ * access; otherwise it may cause infinite recursion. Check the comments
+ * in RelationIsAccessibleInLogicalDecoding() for details.
*/
#define RelationIsUsedAsCatalogTable(relation) \
((relation)->rd_options && \
@@ -616,6 +620,11 @@ typedef struct ViewOptions
* RelationIsAccessibleInLogicalDecoding
* True if we need to log enough information to have access via
* decoding snapshot.
+ * This definition should not invoke anything that performs catalog
+ * access. Otherwise, e.g. logging a WAL entry for catalog relation may
+ * invoke this function, which will in turn do catalog access, which may
+ * in turn cause another similar WAL entry to be logged, leading to
+ * infinite recursion.
*/
#define RelationIsAccessibleInLogicalDecoding(relation) \
(XLogLogicalInfoActive() && \
--
2.25.1
From 38a41501bb640137d1778c5c27989efa05e80783 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?=
<[email protected]>
Date: Mon, 14 Dec 2020 13:46:20 -0300
Subject: [PATCH v7 4/5] New TAP test for logical decoding on standby.
This test was originally written by Craig Ringer, then
extended/modified by me, to test various slot conflict scenarios.
Authors: Craig Ringer, Amit Khandekar.
---
src/test/perl/PostgresNode.pm | 37 +++
.../t/022_standby_logical_decoding_xmins.pl | 272 ++++++++++++++++++
.../023_standby_logical_decoding_conflicts.pl | 216 ++++++++++++++
3 files changed, 525 insertions(+)
create mode 100644 src/test/recovery/t/022_standby_logical_decoding_xmins.pl
create mode 100644 src/test/recovery/t/023_standby_logical_decoding_conflicts.pl
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 9667f7667e..1d62700ca6 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2223,6 +2223,43 @@ sub pg_recvlogical_upto
=pod
+=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+ my ($self, $master, $slot_name, $dbname) = @_;
+ my ($stdout, $stderr);
+
+ my $handle;
+
+ $handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+ # Once slot restart_lsn is created, the standby looks for xl_running_xacts
+ # WAL record from the restart_lsn onwards. So firstly, wait until the slot
+ # restart_lsn is evaluated.
+
+ $self->poll_query_until(
+ 'postgres', qq[
+ SELECT restart_lsn IS NOT NULL
+ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'
+ ]) or die "timed out waiting for logical slot to calculate its restart_lsn";
+
+ # Now arrange for the xl_running_xacts record for which pg_recvlogical
+ # is waiting.
+ $master->safe_psql('postgres', 'CHECKPOINT');
+
+ $handle->finish();
+
+ is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created')
+ or die "could not create slot" . $slot_name;
+}
+
+=pod
+
=back
=cut
diff --git a/src/test/recovery/t/022_standby_logical_decoding_xmins.pl b/src/test/recovery/t/022_standby_logical_decoding_xmins.pl
new file mode 100644
index 0000000000..d654d79526
--- /dev/null
+++ b/src/test/recovery/t/022_standby_logical_decoding_xmins.pl
@@ -0,0 +1,272 @@
+# logical decoding on a standby : ensure xmins are appropriately updated
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 23;
+use RecursiveCopy;
+use File::Copy;
+use Time::HiRes qw(usleep);
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $slot);
+
+my $node_master = get_new_node('master');
+my $node_standby = get_new_node('standby');
+
+# Name for the physical slot on master
+my $master_slotname = 'master_physical';
+# Name for the logical slot on standby
+my $standby_slotname = 'standby_logical';
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state
+sub wait_for_xmins
+{
+ my ($node, $slotname, $check_expr) = @_;
+
+ $node->poll_query_until(
+ 'postgres', qq[
+ SELECT $check_expr
+ FROM pg_catalog.pg_replication_slots
+ WHERE slot_name = '$slotname';
+ ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+
+########################
+# Initialize master node
+########################
+
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->safe_psql('postgres', qq[SELECT * FROM pg_create_physical_replication_slot('$master_slotname');]);
+my $backup_name = 'b1';
+$node_master->backup($backup_name);
+
+# After slot creation, xmins must be null
+$slot = $node_master->slot($master_slotname);
+is($slot->{'xmin'}, '', "xmin null");
+is($slot->{'catalog_xmin'}, '', "catalog_xmin null");
+
+#######################
+# Initialize slave node
+#######################
+
+$node_standby->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+ qq[primary_slot_name = '$master_slotname']);
+$node_standby->start;
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+
+################################
+# xmin/catalog_xmin verification before and after standby-logical-slot creation.
+################################
+
+# With hot_standby_feedback off, xmin and catalog_xmin must still be null
+$slot = $node_master->slot($master_slotname);
+is($slot->{'xmin'}, '', "xmin null after standby join");
+is($slot->{'catalog_xmin'}, '', "catalog_xmin null after standby join");
+
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_master, $master_slotname, "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+# Create new slots on the standby, ignoring the ones on the master completely.
+#
+# This must succeed since we know we have a catalog_xmin reservation. We
+# might've already sent hot standby feedback to advance our physical slot's
+# catalog_xmin but not received the corresponding xlog for the catalog xmin
+# advance, in which case we'll create a slot that isn't usable. The calling
+# application can prevent this by creating a temporary slot on the master to
+# lock in its catalog_xmin. For a truly race-free solution we'd need
+# master-to-standby hot_standby_feedback replies.
+#
+# In this case it won't race because there's no concurrent activity on the
+# master.
+#
+$node_standby->create_logical_slot_on_standby($node_master, $standby_slotname, 'postgres');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+# Now that slot is created on standby, xmin and catalog_xmin should be non NULL
+# on both master and standby. But on master, the xmins will be updated only
+# after hot standby feedback is received.
+wait_for_xmins($node_master, $master_slotname, "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+
+$slot = $node_standby->slot($standby_slotname);
+is($slot->{'xmin'}, '', "logical xmin null");
+isnt($slot->{'catalog_xmin'}, '', "logical catalog_xmin not null");
+
+
+################################
+# Standby logical slot should be able to fetch the table changes even when the
+# table is dropped.
+################################
+
+$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('postgres', q[INSERT INTO test_table(blah) values ('itworks')]);
+$node_master->safe_psql('postgres', 'DROP TABLE test_table');
+$node_master->safe_psql('postgres', 'VACUUM');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+# Should show the inserts even when the table is dropped on master
+($ret, $stdout, $stderr) = $node_standby->psql('postgres', qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($stderr, '', 'stderr is empty');
+is($ret, 0, 'replay from slot succeeded')
+ or die 'cannot continue if slot replay fails';
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+$slot = $node_master->slot($master_slotname);
+isnt($slot->{'xmin'}, '', "physical xmin not null");
+my $saved_physical_catalog_xmin = $slot->{'catalog_xmin'};
+isnt($saved_physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+$slot = $node_standby->slot($standby_slotname);
+is($slot->{'xmin'}, '', "logical xmin null");
+my $saved_logical_catalog_xmin = $slot->{'catalog_xmin'};
+isnt($saved_logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+
+################################
+# Catalog xmins should advance after standby logical slot fetches the changes.
+################################
+
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_1();]);
+$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)');
+for my $i (0 .. 2000)
+{
+ $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_2();]);
+$node_master->safe_psql('postgres', 'VACUUM');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+cmp_ok($node_standby->slot($standby_slotname)->{'catalog_xmin'}, "==",
+ $saved_logical_catalog_xmin,
+ "logical slot catalog_xmin hasn't advanced before get_changes");
+
+($ret, $stdout, $stderr) = $node_standby->psql('postgres',
+ qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+isnt($stdout, '', 'replayed some rows');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+# logical slot catalog_xmin on slave should advance after pg_logical_slot_get_changes
+wait_for_xmins($node_standby, $standby_slotname,
+ "catalog_xmin::varchar::int > ${saved_logical_catalog_xmin}");
+$slot = $node_standby->slot($standby_slotname);
+my $new_logical_catalog_xmin = $slot->{'catalog_xmin'};
+is($slot->{'xmin'}, '', "logical xmin null");
+
+# hot standby feedback should advance master's phys catalog_xmin now that the
+# standby's slot doesn't hold it down as far.
+# But master's phys catalog_xmin should not go past the slave's logical slot's
+# catalog_xmin, even while master's phys xmin advances.
+#
+# First, make sure master's xmin is advanced. This happens on hot standby
+# feedback. So this check for master's xmin advance also makes sure hot standby
+# feedback has reached the master, which is required for the subsequent
+# catalog_xmin test.
+my $temp_phys_xmin = $node_master->slot($master_slotname)->{'xmin'};
+$node_master->safe_psql('postgres', 'SELECT txid_current()');
+wait_for_xmins($node_master, $master_slotname,
+ "xmin::varchar::int > ${temp_phys_xmin}");
+$slot = $node_master->slot($master_slotname);
+# Now check that the master's phys catalog_xmin has advanced but not beyond
+# standby's logical catalog_xmin
+cmp_ok($slot->{'catalog_xmin'}, ">", $saved_physical_catalog_xmin,
+ 'upstream physical slot catalog_xmin has advanced with hs_feedback on');
+cmp_ok($slot->{'catalog_xmin'}, "==", $new_logical_catalog_xmin,
+ 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on');
+
+
+######################
+# Upstream oldestXid should not go past downstream catalog_xmin
+######################
+
+# First burn some xids on the master in another DB, so we push the master's
+# nextXid ahead.
+foreach my $i (1 .. 100)
+{
+ $node_master->safe_psql('postgres', 'SELECT txid_current()');
+}
+
+# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
+# past our needed xmin. The only way we have visibility into that is to force
+# a checkpoint.
+$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
+foreach my $dbname ('template1', 'postgres', 'postgres', 'template0')
+{
+ $node_master->safe_psql($dbname, 'VACUUM FREEZE');
+}
+$node_master->safe_psql('postgres', 'CHECKPOINT');
+IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
+ or die "pg_controldata failed with $?";
+my @checkpoint = split('\n', $stdout);
+my $oldestXid = '';
+foreach my $line (@checkpoint)
+{
+ if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
+ {
+ $oldestXid = $1;
+ }
+}
+die 'no oldestXID found in checkpoint' unless $oldestXid;
+
+cmp_ok($oldestXid, "<=", $node_standby->slot($standby_slotname)->{'catalog_xmin'},
+ 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
+
+$node_master->safe_psql('postgres',
+ "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
+
+
+##################################################
+# Drop slot
+# Make sure standby slots are droppable, and properly clear the upstream's xmin
+##################################################
+
+is($node_standby->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on');
+
+$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$standby_slotname')]);
+
+is($node_standby->slot($standby_slotname)->{'slot_type'}, '', 'slot on standby dropped manually');
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. catalog_xmin should become NULL because we dropped
+# the logical slot.
+wait_for_xmins($node_master, $master_slotname,
+ "xmin IS NOT NULL AND catalog_xmin IS NULL");
diff --git a/src/test/recovery/t/023_standby_logical_decoding_conflicts.pl b/src/test/recovery/t/023_standby_logical_decoding_conflicts.pl
new file mode 100644
index 0000000000..d0c449338f
--- /dev/null
+++ b/src/test/recovery/t/023_standby_logical_decoding_conflicts.pl
@@ -0,0 +1,216 @@
+# logical decoding on a standby : test conflict recovery; and other tests that
+# verify slots get dropped as expected.
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 24;
+use RecursiveCopy;
+use File::Copy;
+use Time::HiRes qw(usleep);
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $slot);
+
+my $node_master = get_new_node('master');
+my $node_standby = get_new_node('standby');
+
+# Name for the physical slot on master
+my $master_slotname = 'master_physical';
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state
+sub wait_for_xmins
+{
+ my ($node, $slotname, $check_expr) = @_;
+
+ $node->poll_query_until(
+ 'postgres', qq[
+ SELECT $check_expr
+ FROM pg_catalog.pg_replication_slots
+ WHERE slot_name = '$slotname';
+ ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+ $node_standby->create_logical_slot_on_standby($node_master, 'dropslot', 'testdb');
+ $node_standby->create_logical_slot_on_standby($node_master, 'activeslot', 'testdb');
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots()
+sub make_slot_active
+{
+ my $slot_user_handle;
+
+ # make sure activeslot is in use
+ print "starting pg_recvlogical\n";
+ $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node_standby->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+
+ while (!$node_standby->slot('activeslot')->{'active_pid'})
+ {
+ usleep(100_000);
+ print "waiting for slot to become active\n";
+ }
+ return $slot_user_handle;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'dropslot'.
+sub check_slots_dropped
+{
+ my ($slot_user_handle) = @_;
+ my $return;
+
+ is($node_standby->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped');
+ is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
+
+ # our client should've terminated in response to the walsender error
+ eval {
+ $slot_user_handle->finish;
+ };
+ $return = $?;
+ cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero\n");
+ if ($return) {
+ like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict');
+ like($stderr, qr/must be dropped/, 'recvlogical error detail');
+ }
+
+ return 0;
+}
+
+
+########################
+# Initialize master node
+########################
+
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$master_slotname');]);
+my $backup_name = 'b1';
+$node_master->backup($backup_name);
+
+#######################
+# Initialize slave node
+#######################
+
+$node_standby->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+ qq[primary_slot_name = '$master_slotname']);
+$node_standby->start;
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+
+##################################################
+# Recovery conflict: Drop conflicting slots, including in-use slots
+# Scenario 1 : hot_standby_feedback off
+##################################################
+
+create_logical_slots();
+
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on slave.
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_standby->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on master. Both should be NULL since hs_feedback is off
+wait_for_xmins($node_master, $master_slotname,
+ "xmin IS NULL AND catalog_xmin IS NULL");
+
+$handle = make_slot_active();
+
+# This should trigger the conflict
+$node_master->safe_psql('testdb', 'VACUUM FULL');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+check_slots_dropped($handle);
+
+# Turn hot_standby_feedback back on
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_master, $master_slotname,
+ "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Drop conflicting slots, including in-use slots
+# Scenario 2 : incorrect wal_level at master
+##################################################
+
+create_logical_slots();
+
+$handle = make_slot_active();
+
+# Make master wal_level replica. This will trigger slot conflict.
+$node_master->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_master->restart;
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+check_slots_dropped($handle);
+
+# Restore master wal_level
+$node_master->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_master->restart;
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+create_logical_slots();
+$handle = make_slot_active();
+
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_master, 'otherslot', 'postgres');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+is($node_standby->safe_psql('postgres',
+ q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+ 'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+ 'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]);
--
2.25.1
From 6865ce782d8f2416255598b935fded887660a33b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?=
<[email protected]>
Date: Mon, 14 Dec 2020 13:47:41 -0300
Subject: [PATCH v7 5/5] Doc changes describing details about logical decoding
---
doc/src/sgml/logicaldecoding.sgml | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 813a037fac..003520dc02 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -248,6 +248,24 @@ $ pg_recvlogical -d postgres --slot=test --drop-slot
may consume changes from a slot at any given time.
</para>
+ <para>
+ A logical replication slot can also be created on a hot standby. To prevent
+ <command>VACUUM</command> from removing required rows from the system
+ catalogs, <varname>hot_standby_feedback</varname> should be set on the
+ standby. In spite of that, if any required rows get removed, the slot gets
+ dropped. Existing logical slots on standby also get dropped if wal_level
+ on primary is reduced to less than 'logical'.
+ </para>
+
+ <para>
+ For a logical slot to be created, it builds a historic snapshot, for which
+ information of all the currently running transactions is essential. On
+ primary, this information is available, but on standby, this information
+ has to be obtained from primary. So, slot creation may wait for some
+ activity to happen on the primary. If the primary is idle, creating a
+ logical slot on standby may take a noticeable time.
+ </para>
+
<caution>
<para>
Replication slots persist across crashes and know nothing about the state
--
2.25.1
From 873fd297349c309c26245afcd09e92ba09dd76fc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?=
<[email protected]>
Date: Mon, 14 Dec 2020 11:52:02 -0300
Subject: [PATCH v7 3/5] Handle logical slot conflicts on standby.
During WAL replay on standby, when slot conflict is identified,
drop such slots. Also do the same thing if wal_level on master
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery. Arrange for a new pg_stat_get_activity field:
confl_logicalslot.
Amit Khandekar, reviewed by Andres Freund.
---
doc/src/sgml/monitoring.sgml | 10 ++
src/backend/access/gist/gistxlog.c | 4 +-
src/backend/access/hash/hash_xlog.c | 3 +-
src/backend/access/heap/heapam.c | 13 +-
src/backend/access/nbtree/nbtxlog.c | 4 +-
src/backend/access/spgist/spgxlog.c | 1 +
src/backend/access/transam/xlog.c | 14 ++
src/backend/catalog/system_views.sql | 1 +
src/backend/postmaster/pgstat.c | 4 +
src/backend/replication/slot.c | 188 +++++++++++++++++++++++++++
src/backend/storage/ipc/procarray.c | 4 +
src/backend/storage/ipc/procsignal.c | 3 +
src/backend/storage/ipc/standby.c | 4 +-
src/backend/tcop/postgres.c | 22 ++++
src/backend/utils/adt/pgstatfuncs.c | 16 +++
src/include/catalog/pg_proc.dat | 5 +
src/include/pgstat.h | 1 +
src/include/replication/slot.h | 2 +
src/include/storage/procsignal.h | 1 +
src/include/storage/standby.h | 2 +-
src/test/regress/expected/rules.out | 1 +
21 files changed, 294 insertions(+), 9 deletions(-)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 52a69a5366..788f17c3a4 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -3842,6 +3842,16 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
deadlocks
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_logicalslot</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of queries in this database that have been canceled due to
+ logical slots
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 6580c98cb8..5c4a3cdc5b 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+ xldata->onCatalogTable, rnode);
}
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
@@ -413,6 +414,7 @@ gistRedoPageReuse(XLogReaderState *record)
latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid);
ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+ xlrec->onCatalogTable,
xlrec->node);
}
}
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index 3c60677662..5767890fa4 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
RelFileNode rnode;
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+ xldata->onCatalogTable, rnode);
}
action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 65bce76277..46c79def07 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7702,7 +7702,8 @@ heap_xlog_cleanup_info(XLogReaderState *record)
xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
+ ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->onCatalogTable, xlrec->node);
/*
* Actual operation is a no-op. Record type exists to provide a means for
@@ -7738,7 +7739,8 @@ heap_xlog_clean(XLogReaderState *record)
* latestRemovedXid is invalid, skip conflict processing.
*/
if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->onCatalogTable, rnode);
/*
* If we have a full-page image, restore it (using a cleanup lock) and
@@ -7834,7 +7836,9 @@ heap_xlog_visible(XLogReaderState *record)
* rather than killing the transaction outright.
*/
if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid,
+ xlrec->onCatalogTable,
+ rnode);
/*
* Read the heap page, if it still exists. If the heap file has dropped or
@@ -7971,7 +7975,8 @@ heap_xlog_freeze_page(XLogReaderState *record)
TransactionIdRetreat(latestRemovedXid);
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+ xlrec->onCatalogTable, rnode);
}
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 5135b800af..46591d54fa 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -662,7 +662,8 @@ btree_xlog_delete(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->onCatalogTable, rnode);
}
/*
@@ -959,6 +960,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
if (InHotStandby)
{
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->onCatalogTable,
xlrec->node);
}
}
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index 999d0ca15d..3782b3a69a 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+ xldata->onCatalogTable,
node);
}
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 609edbaca6..2c285a6964 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -10255,6 +10255,20 @@ xlog_redo(XLogReaderState *record)
/* Update our copy of the parameters in pg_control */
memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
+ /*
+ * Drop logical slots if we are in hot standby and the primary does not
+ * have a WAL level sufficient for logical decoding. No need to search
+ * for potentially conflicting logically slots if standby is running
+ * with wal_level lower than logical, because in that case, we would
+ * have either disallowed creation of logical slots or dropped existing
+ * ones.
+ */
+ if (InRecovery && InHotStandby &&
+ xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+ wal_level >= WAL_LEVEL_LOGICAL)
+ ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId,
+ gettext_noop("Logical decoding on standby requires wal_level >= logical on master."));
+
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->MaxConnections = xlrec.MaxConnections;
ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b140c210bc..b67d0f6ef9 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -938,6 +938,7 @@ CREATE VIEW pg_stat_database_conflicts AS
pg_stat_get_db_conflict_tablespace(D.oid) AS confl_tablespace,
pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
+ pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_logicalslot,
pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
FROM pg_database D;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 6b60f293e9..b52755084d 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4978,6 +4978,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
dbentry->n_conflict_tablespace = 0;
dbentry->n_conflict_lock = 0;
dbentry->n_conflict_snapshot = 0;
+ dbentry->n_conflict_logicalslot = 0;
dbentry->n_conflict_bufferpin = 0;
dbentry->n_conflict_startup_deadlock = 0;
dbentry->n_temp_files = 0;
@@ -6846,6 +6847,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
dbentry->n_conflict_snapshot++;
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ dbentry->n_conflict_logicalslot++;
+ break;
case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
dbentry->n_conflict_bufferpin++;
break;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 8a0f51907b..faceab0fa3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,6 +46,7 @@
#include "pgstat.h"
#include "replication/slot.h"
#include "storage/fd.h"
+#include "storage/lock.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
@@ -103,6 +104,7 @@ static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
const char *name, SlotAcquireBehavior behavior);
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static void ReplicationSlotDropConflicting(ReplicationSlot *slot);
/* internal persistency functions */
static void RestoreSlotFromDisk(const char *name);
@@ -711,6 +713,64 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
LWLockRelease(ReplicationSlotAllocationLock);
}
+/*
+ * Permanently drop a conflicting replication slot. If it's already active by
+ * another backend, send it a recovery conflict signal, and then try again.
+ */
+static void
+ReplicationSlotDropConflicting(ReplicationSlot *slot)
+{
+ pid_t active_pid;
+ PGPROC *proc;
+ VirtualTransactionId vxid;
+
+ ConditionVariablePrepareToSleep(&slot->active_cv);
+ while (1)
+ {
+ SpinLockAcquire(&slot->mutex);
+ active_pid = slot->active_pid;
+ if (active_pid == 0)
+ active_pid = slot->active_pid = MyProcPid;
+ SpinLockRelease(&slot->mutex);
+
+ /* Drop the acquired slot, unless it is acquired by another backend */
+ if (active_pid == MyProcPid)
+ {
+ elog(DEBUG1, "acquired conflicting slot, now dropping it");
+ ReplicationSlotDropPtr(slot);
+ break;
+ }
+
+ /* Send the other backend, a conflict recovery signal */
+
+ SetInvalidVirtualTransactionId(vxid);
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ proc = BackendPidGetProcWithLock(active_pid);
+ if (proc)
+ GET_VXID_FROM_PGPROC(vxid, *proc);
+ LWLockRelease(ProcArrayLock);
+
+ /*
+ * If coincidently that process finished, some other backend may
+ * acquire the slot again. So start over again.
+ * Note: Even if vxid.localTransactionId is invalid, we need to cancel
+ * that backend, because there is no other way to make it release the
+ * slot. So don't bother to validate vxid.localTransactionId.
+ */
+ if (vxid.backendId == InvalidBackendId)
+ continue;
+
+ elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot",
+ active_pid, vxid.backendId);
+
+ CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+ ConditionVariableSleep(&slot->active_cv,
+ WAIT_EVENT_REPLICATION_SLOT_DROP);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
/*
* Serialize the currently acquired slot's state from memory to disk, thereby
* guaranteeing the current state will survive a crash.
@@ -1141,12 +1201,25 @@ ReplicationSlotReserveWal(void)
{
XLogRecPtr flushptr;
+ /* start at current insert position */
+ restart_lsn = GetXLogInsertRecPtr();
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
+
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
/* and make sure it's fsynced to disk */
XLogFlush(flushptr);
}
+ else
+ {
+ restart_lsn = GetRedoRecPtr();
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
+ }
/* prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
@@ -1164,6 +1237,121 @@ ReplicationSlotReserveWal(void)
}
}
+/*
+ * Resolve recovery conflicts with logical slots.
+ *
+ * When xid is valid, it means that rows older than xid might have been
+ * removed. Therefore we need to drop slots that depend on seeing those rows.
+ * When xid is invalid, drop all logical slots. This is required when the
+ * master wal_level is set back to replica, so existing logical slots need to
+ * be dropped. Also, when xid is invalid, a common 'conflict_reason' is
+ * provided for the error detail; otherwise it is NULL, in which case it is
+ * constructed out of the xid value.
+ */
+void
+ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid,
+ char *conflict_reason)
+{
+ int i;
+ bool found_conflict = false;
+
+ if (max_replication_slots <= 0)
+ return;
+
+restart:
+ if (found_conflict)
+ {
+ CHECK_FOR_INTERRUPTS();
+ found_conflict = false;
+ }
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s;
+
+ s = &ReplicationSlotCtl->replication_slots[i];
+
+ /* cannot change while ReplicationSlotCtlLock is held */
+ if (!s->in_use)
+ continue;
+
+ /* We are only dealing with *logical* slot conflicts. */
+ if (!SlotIsLogical(s))
+ continue;
+
+ /* Invalid xid means caller is asking to drop all logical slots */
+ if (!TransactionIdIsValid(xid))
+ found_conflict = true;
+ else
+ {
+ TransactionId slot_xmin;
+ TransactionId slot_catalog_xmin;
+ StringInfoData conflict_str, conflict_xmins;
+ char *conflict_sentence =
+ gettext_noop("Slot conflicted with xid horizon which was being increased to");
+
+ /* not our database, skip */
+ if (s->data.database != InvalidOid && s->data.database != dboid)
+ continue;
+
+ SpinLockAcquire(&s->mutex);
+ slot_xmin = s->data.xmin;
+ slot_catalog_xmin = s->data.catalog_xmin;
+ SpinLockRelease(&s->mutex);
+
+ /*
+ * Build the conflict_str which will look like :
+ * "Slot conflicted with xid horizon which was being increased
+ * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)."
+ */
+ initStringInfo(&conflict_xmins);
+ if (TransactionIdIsValid(slot_xmin) &&
+ TransactionIdPrecedesOrEquals(slot_xmin, xid))
+ {
+ appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin);
+ }
+ if (TransactionIdIsValid(slot_catalog_xmin) &&
+ TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+ appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d",
+ conflict_xmins.len > 0 ? ", " : "",
+ slot_catalog_xmin);
+
+ if (conflict_xmins.len > 0)
+ {
+ initStringInfo(&conflict_str);
+ appendStringInfo(&conflict_str, "%s %d (%s).",
+ conflict_sentence, xid, conflict_xmins.data);
+ found_conflict = true;
+ conflict_reason = conflict_str.data;
+ }
+ }
+
+ if (found_conflict)
+ {
+ NameData slotname;
+
+ SpinLockAcquire(&s->mutex);
+ slotname = s->data.name;
+ SpinLockRelease(&s->mutex);
+
+ /* ReplicationSlotDropConflicting() will acquire the lock below */
+ LWLockRelease(ReplicationSlotControlLock);
+
+ ReplicationSlotDropConflicting(s);
+
+ ereport(LOG,
+ (errmsg("dropped conflicting slot %s", NameStr(slotname)),
+ errdetail("%s", conflict_reason)));
+
+ /* We released the lock above; so re-scan the slots. */
+ goto restart;
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+}
+
/*
* Mark any slot that points to an LSN older than the given segment
* as invalid; it requires WAL that's about to be removed.
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index ee912b9d5e..18c23927d5 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -3339,6 +3339,10 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
GET_VXID_FROM_PGPROC(procvxid, *proc);
+ /*
+ * Note: vxid.localTransactionId can be invalid, which means the
+ * request is to signal the pid that is not running a transaction.
+ */
if (procvxid.backendId == vxid.backendId &&
procvxid.localTransactionId == vxid.localTransactionId)
{
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index ffe67acea1..8047bcce63 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -579,6 +579,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
+ if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+ RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 4ea3cf1f5c..a66bfcebf1 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -23,6 +23,7 @@
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -300,7 +301,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
}
void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+ bool onCatalogTable, RelFileNode node)
{
VirtualTransactionId *backends;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 3679799e50..649f419903 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2465,6 +2465,9 @@ errdetail_recovery_conflict(void)
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
errdetail("User query might have needed to see row versions that must be removed.");
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ errdetail("User was using the logical slot that must be dropped.");
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
errdetail("User transaction caused buffer deadlock with recovery.");
break;
@@ -2933,6 +2936,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
case PROCSIG_RECOVERY_CONFLICT_LOCK:
case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ /*
+ * For conflicts that require a logical slot to be dropped, the
+ * requirement is for the signal receiver to release the slot,
+ * so that it could be dropped by the signal sender. So for
+ * normal backends, the transaction should be aborted, just
+ * like for other recovery conflicts. But if it's walsender on
+ * standby, then it has to be killed so as to release an
+ * acquired logical slot.
+ */
+ if (am_cascading_walsender &&
+ reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+ MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
+ {
+ RecoveryConflictPending = true;
+ QueryCancelPending = true;
+ InterruptPending = true;
+ break;
+ }
/*
* If we aren't in a transaction any longer then ignore.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 6afe1b6f56..7a2da6780b 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1494,6 +1494,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS)
PG_RETURN_INT64(result);
}
+Datum
+pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS)
+{
+ Oid dbid = PG_GETARG_OID(0);
+ int64 result;
+ PgStat_StatDBEntry *dbentry;
+
+ if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+ result = 0;
+ else
+ result = (int64) (dbentry->n_conflict_logicalslot);
+
+ PG_RETURN_INT64(result);
+}
+
Datum
pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS)
{
@@ -1537,6 +1552,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
result = (int64) (dbentry->n_conflict_tablespace +
dbentry->n_conflict_lock +
dbentry->n_conflict_snapshot +
+ dbentry->n_conflict_logicalslot +
dbentry->n_conflict_bufferpin +
dbentry->n_conflict_startup_deadlock);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index e6c7b070f6..bd79bc2b20 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5389,6 +5389,11 @@
proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '3585',
+ descr => 'statistics: recovery conflicts in database caused by logical replication slot',
+ proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+ prosrc => 'pg_stat_get_db_conflict_logicalslot' },
{ oid => '3068',
descr => 'statistics: recovery conflicts in database caused by shared buffer pin',
proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5954068dec..b6627a7e76 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -687,6 +687,7 @@ typedef struct PgStat_StatDBEntry
PgStat_Counter n_conflict_tablespace;
PgStat_Counter n_conflict_lock;
PgStat_Counter n_conflict_snapshot;
+ PgStat_Counter n_conflict_logicalslot;
PgStat_Counter n_conflict_bufferpin;
PgStat_Counter n_conflict_startup_deadlock;
PgStat_Counter n_temp_files;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 63bab6967f..90027da054 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -217,4 +217,6 @@ extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void);
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason);
+
#endif /* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 5cb39697f3..ffcf9e2d49 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -40,6 +40,7 @@ typedef enum
PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
PROCSIG_RECOVERY_CONFLICT_LOCK,
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index faaf1d3817..bc3471777c 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void);
extern void ShutdownRecoveryTransactionEnvironment(void);
extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
- RelFileNode node);
+ bool onCatalogTable, RelFileNode node);
extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6293ab57bc..9d5bd887bf 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1860,6 +1860,7 @@ pg_stat_database_conflicts| SELECT d.oid AS datid,
pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace,
pg_stat_get_db_conflict_lock(d.oid) AS confl_lock,
pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot,
+ pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_logicalslot,
pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin,
pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock
FROM pg_database d;
--
2.25.1