On Sat, 6 Apr 2019 at 04:45, Andres Freund <[email protected]> wrote:
>
> Hi,
>
> Thanks for the new version of the patch. Btw, could you add Craig as a
> co-author in the commit message of the next version of the patch? Don't
> want to forget him.
I had put his name in the earlier patch. But now I have made it easier to spot.
>
> On 2019-04-05 17:08:39 +0530, Amit Khandekar wrote:
> > Regarding the test result failures, I could see that when we drop a
> > logical replication slot at standby server, then the catalog_xmin of
> > physical replication slot becomes NULL, whereas the test expects it to
> > be equal to xmin; and that's the reason a couple of test scenarios are
> > failing :
> >
> > ok 33 - slot on standby dropped manually
> > Waiting for replication conn replica's replay_lsn to pass '0/31273E0' on
> > master
> > done
> > not ok 34 - physical catalog_xmin still non-null
> > not ok 35 - xmin and catalog_xmin equal after slot drop
> > # Failed test 'xmin and catalog_xmin equal after slot drop'
> > # at t/016_logical_decoding_on_replica.pl line 272.
> > # got:
> > # expected: 2584
> >
> > I am not sure what is expected. What actually happens is : the
> > physical xlot catalog_xmin remains NULL initially, but becomes
> > non-NULL after the logical replication slot is created on standby.
>
> That seems like the correct behaviour to me - why would we still have a
> catalog xmin if there's no slot logical slot?
Yeah ... In the earlier implementation, maybe it was different, that's
why the catalog_xmin didn't become NULL. Not sure. Anyways, I have
changed this check. Details in the following sections.
>
>
> > diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> > index 006446b..5785d2f 100644
> > --- a/src/backend/replication/slot.c
> > +++ b/src/backend/replication/slot.c
> > @@ -1064,6 +1064,85 @@ ReplicationSlotReserveWal(void)
> > }
> > }
> >
> > +void
> > +ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid)
> > +{
> > + int i;
> > + bool found_conflict = false;
> > +
> > + if (max_replication_slots <= 0)
> > + return;
> > +
> > +restart:
> > + if (found_conflict)
> > + {
> > + CHECK_FOR_INTERRUPTS();
> > + /*
> > + * Wait awhile for them to die so that we avoid flooding an
> > + * unresponsive backend when system is heavily loaded.
> > + */
> > + pg_usleep(100000);
> > + found_conflict = false;
> > + }
> > +
> > + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> > + for (i = 0; i < max_replication_slots; i++)
> > + {
> > + ReplicationSlot *s;
> > + NameData slotname;
> > + TransactionId slot_xmin;
> > + TransactionId slot_catalog_xmin;
> > +
> > + s = &ReplicationSlotCtl->replication_slots[i];
> > +
> > + /* cannot change while ReplicationSlotCtlLock is held */
> > + if (!s->in_use)
> > + continue;
> > +
> > + /* not our database, skip */
> > + if (s->data.database != InvalidOid && s->data.database !=
> > dboid)
> > + continue;
> > +
> > + SpinLockAcquire(&s->mutex);
> > + slotname = s->data.name;
> > + slot_xmin = s->data.xmin;
> > + slot_catalog_xmin = s->data.catalog_xmin;
> > + SpinLockRelease(&s->mutex);
> > +
> > + if (TransactionIdIsValid(slot_xmin) &&
> > TransactionIdPrecedesOrEquals(slot_xmin, xid))
> > + {
> > + found_conflict = true;
> > +
> > + ereport(WARNING,
> > + (errmsg("slot %s w/ xmin %u conflicts
> > with removed xid %u",
> > + NameStr(slotname),
> > slot_xmin, xid)));
> > + }
> > +
> > + if (TransactionIdIsValid(slot_catalog_xmin) &&
> > TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
> > + {
> > + found_conflict = true;
> > +
> > + ereport(WARNING,
> > + (errmsg("slot %s w/ catalog xmin %u
> > conflicts with removed xid %u",
> > + NameStr(slotname),
> > slot_catalog_xmin, xid)));
> > + }
> > +
> > +
> > + if (found_conflict)
> > + {
> > + elog(WARNING, "Dropping conflicting slot %s",
> > s->data.name.data);
> > + LWLockRelease(ReplicationSlotControlLock); /*
> > avoid deadlock */
> > + ReplicationSlotDropPtr(s);
> > +
> > + /* We released the lock above; so re-scan the slots.
> > */
> > + goto restart;
> > + }
> > + }
> >
> I think this should be refactored so that the two found_conflict cases
> set a 'reason' variable (perhaps an enum?) to the particular reason, and
> then only one warning should be emitted. I also think that LOG might be
> more appropriate than WARNING - as confusing as that is, LOG is more
> severe than WARNING (see docs about log_min_messages).
What I have in mind is :
ereport(LOG,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Dropping conflicting slot %s", s->data.name.data),
errdetail("%s, removed xid %d.", conflict_str, xid)));
where conflict_str is a dynamically generated string containing
something like : "slot xmin : 1234, slot catalog_xmin: 5678"
So for the user, the errdetail will look like :
"slot xmin: 1234, catalog_xmin: 5678, removed xid : 9012"
I think the user can figure out whether it was xmin or catalog_xmin or
both that conflicted with removed xid.
If we don't do this way, we may not be able to show in a single
message if both xmin and catalog_xmin are conflicting at the same
time.
Does this message look good to you, or you had in mind something quite
different ?
>
>
> > @@ -0,0 +1,386 @@
> > +# Demonstrate that logical can follow timeline switches.
> > +#
> > +# Test logical decoding on a standby.
> > +#
> > +use strict;
> > +use warnings;
> > +use 5.8.0;
> > +
> > +use PostgresNode;
> > +use TestLib;
> > +use Test::More tests => 55;
> > +use RecursiveCopy;
> > +use File::Copy;
> > +
> > +my ($stdin, $stdout, $stderr, $ret, $handle, $return);
> > +my $backup_name;
> > +
> > +# Initialize master node
> > +my $node_master = get_new_node('master');
> > +$node_master->init(allows_streaming => 1, has_archiving => 1);
> > +$node_master->append_conf('postgresql.conf', q{
> > +wal_level = 'logical'
> > +max_replication_slots = 4
> > +max_wal_senders = 4
> > +log_min_messages = 'debug2'
> > +log_error_verbosity = verbose
> > +# send status rapidly so we promptly advance xmin on master
> > +wal_receiver_status_interval = 1
> > +# very promptly terminate conflicting backends
> > +max_standby_streaming_delay = '2s'
> > +});
> > +$node_master->dump_info;
> > +$node_master->start;
> > +
> > +$node_master->psql('postgres', q[CREATE DATABASE testdb]);
> > +
> > +$node_master->safe_psql('testdb', q[SELECT * FROM
> > pg_create_physical_replication_slot('decoding_standby');]);
> > +$backup_name = 'b1';
> > +my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
> > +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d',
> > $node_master->connstr('testdb'), '--slot=decoding_standby');
> > +
> > +sub print_phys_xmin
> > +{
> > + my $slot = $node_master->slot('decoding_standby');
> > + return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
> > +}
> > +
> > +my ($xmin, $catalog_xmin) = print_phys_xmin();
> > +# After slot creation, xmins must be null
> > +is($xmin, '', "xmin null");
> > +is($catalog_xmin, '', "catalog_xmin null");
> > +
> > +my $node_replica = get_new_node('replica');
> > +$node_replica->init_from_backup(
> > + $node_master, $backup_name,
> > + has_streaming => 1,
> > + has_restoring => 1);
> > +$node_replica->append_conf('postgresql.conf',
> > + q[primary_slot_name = 'decoding_standby']);
> > +
> > +$node_replica->start;
> > +$node_master->wait_for_catchup($node_replica, 'replay',
> > $node_master->lsn('flush'));
> > +
> > +# with hot_standby_feedback off, xmin and catalog_xmin must still be null
> > +($xmin, $catalog_xmin) = print_phys_xmin();
> > +is($xmin, '', "xmin null after replica join");
> > +is($catalog_xmin, '', "catalog_xmin null after replica join");
> > +
> > +$node_replica->append_conf('postgresql.conf',q[
> > +hot_standby_feedback = on
> > +]);
> > +$node_replica->restart;
> > +sleep(2); # ensure walreceiver feedback sent
>
> Can we make this more robust? E.g. by waiting till pg_stat_replication
> shows the change on the primary? Because I can guarantee that this'll
> fail on slow buildfarm machines (say the valgrind animals).
>
>
>
>
> > +$node_master->wait_for_catchup($node_replica, 'replay',
> > $node_master->lsn('flush'));
> > +sleep(2); # ensure walreceiver feedback sent
>
> Similar.
Ok. I have put a copy of the get_slot_xmins() function from
t/001_stream_rep.pl() into 016_logical_decoding_on_replica.pl. Renamed
it to wait_for_phys_mins(). And used this to wait for the
hot_standby_feedback change to propagate to master. This function
waits for the physical slot's xmin and catalog_xmin to get the right
values depending on whether there is a logical slot in standby and
whether hot_standby_feedback is on on standby.
I was not sure how pg_stat_replication could be used to identify about
hot_standby_feedback change reaching to master. So i did the above
way, which I think pretty much does what we want, I think.
Attached v4 patch only has the testcase change, and some minor cleanup
in the test file.
--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company
From 1e3c68a644da4aa45ca72190cfa254ccd171f9e3 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <[email protected]>
Date: Tue, 9 Apr 2019 22:06:25 +0530
Subject: [PATCH] Logical decoding on standby.
Author : Andres Freund.
Besides the above main changes, patch includes following :
1. Handle slot conflict recovery by dropping the conflicting slots.
-Amit Khandekar.
2. test/recovery/t/016_logical_decoding_on_replica.pl added.
Original author : Craig Ringer. few changes/additions from Amit Khandekar.
---
src/backend/access/gist/gistxlog.c | 6 +-
src/backend/access/hash/hash_xlog.c | 3 +-
src/backend/access/hash/hashinsert.c | 2 +
src/backend/access/heap/heapam.c | 23 +-
src/backend/access/heap/vacuumlazy.c | 2 +-
src/backend/access/heap/visibilitymap.c | 2 +-
src/backend/access/nbtree/nbtpage.c | 3 +
src/backend/access/nbtree/nbtxlog.c | 4 +-
src/backend/access/spgist/spgvacuum.c | 2 +
src/backend/access/spgist/spgxlog.c | 1 +
src/backend/replication/logical/logical.c | 2 +
src/backend/replication/slot.c | 79 +++++
src/backend/storage/ipc/standby.c | 7 +-
src/backend/utils/cache/lsyscache.c | 16 +
src/include/access/gistxlog.h | 3 +-
src/include/access/hash_xlog.h | 1 +
src/include/access/heapam_xlog.h | 8 +-
src/include/access/nbtxlog.h | 2 +
src/include/access/spgxlog.h | 1 +
src/include/replication/slot.h | 2 +
src/include/storage/standby.h | 2 +-
src/include/utils/lsyscache.h | 1 +
src/include/utils/rel.h | 1 +
.../recovery/t/016_logical_decoding_on_replica.pl | 391 +++++++++++++++++++++
24 files changed, 546 insertions(+), 18 deletions(-)
create mode 100644 src/test/recovery/t/016_logical_decoding_on_replica.pl
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 4fb1855..59a7910 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -342,7 +342,8 @@ gistRedoDeleteRecord(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+ xldata->onCatalogTable, rnode);
}
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
@@ -544,7 +545,7 @@ gistRedoPageReuse(XLogReaderState *record)
if (InHotStandby)
{
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
- xlrec->node);
+ xlrec->onCatalogTable, xlrec->node);
}
}
@@ -736,6 +737,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXi
*/
/* XLOG stuff */
+ xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
xlrec_reuse.node = rel->rd_node;
xlrec_reuse.block = blkno;
xlrec_reuse.latestRemovedXid = latestRemovedXid;
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index d7b7098..00c3e0f 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
RelFileNode rnode;
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+ xldata->onCatalogTable, rnode);
}
action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index e17f017..b67e4e6 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -17,6 +17,7 @@
#include "access/hash.h"
#include "access/hash_xlog.h"
+#include "catalog/catalog.h"
#include "miscadmin.h"
#include "utils/rel.h"
#include "storage/lwlock.h"
@@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
xl_hash_vacuum_one_page xlrec;
XLogRecPtr recptr;
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel);
xlrec.latestRemovedXid = latestRemovedXid;
xlrec.ntuples = ndeletable;
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a05b6a0..bfbb9d3 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7100,12 +7100,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel,
* see comments for vacuum_log_cleanup_info().
*/
XLogRecPtr
-log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid)
{
xl_heap_cleanup_info xlrec;
XLogRecPtr recptr;
- xlrec.node = rnode;
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
+ xlrec.node = rel->rd_node;
xlrec.latestRemovedXid = latestRemovedXid;
XLogBeginInsert();
@@ -7141,6 +7142,7 @@ log_heap_clean(Relation reln, Buffer buffer,
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
xlrec.latestRemovedXid = latestRemovedXid;
xlrec.nredirected = nredirected;
xlrec.ndead = ndead;
@@ -7191,6 +7193,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
/* nor when there are no tuples to freeze */
Assert(ntuples > 0);
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
xlrec.cutoff_xid = cutoff_xid;
xlrec.ntuples = ntuples;
@@ -7221,7 +7224,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
* heap_buffer, if necessary.
*/
XLogRecPtr
-log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
TransactionId cutoff_xid, uint8 vmflags)
{
xl_heap_visible xlrec;
@@ -7231,6 +7234,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
Assert(BufferIsValid(heap_buffer));
Assert(BufferIsValid(vm_buffer));
+ xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
xlrec.cutoff_xid = cutoff_xid;
xlrec.flags = vmflags;
XLogBeginInsert();
@@ -7651,7 +7655,8 @@ heap_xlog_cleanup_info(XLogReaderState *record)
xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
+ ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->onCatalogTable, xlrec->node);
/*
* Actual operation is a no-op. Record type exists to provide a means for
@@ -7687,7 +7692,8 @@ heap_xlog_clean(XLogReaderState *record)
* latestRemovedXid is invalid, skip conflict processing.
*/
if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->onCatalogTable, rnode);
/*
* If we have a full-page image, restore it (using a cleanup lock) and
@@ -7783,7 +7789,9 @@ heap_xlog_visible(XLogReaderState *record)
* rather than killing the transaction outright.
*/
if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid,
+ xlrec->onCatalogTable,
+ rnode);
/*
* Read the heap page, if it still exists. If the heap file has dropped or
@@ -7920,7 +7928,8 @@ heap_xlog_freeze_page(XLogReaderState *record)
TransactionIdRetreat(latestRemovedXid);
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+ xlrec->onCatalogTable, rnode);
}
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index c9d8312..fad08e0 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -475,7 +475,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
* No need to write the record at all unless it contains a valid value
*/
if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
- (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+ (void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid);
}
/*
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 64dfe06..c5fdd64 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -281,7 +281,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
if (XLogRecPtrIsInvalid(recptr))
{
Assert(!InRecovery);
- recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
+ recptr = log_heap_visible(rel, heapBuf, vmBuf,
cutoff_xid, flags);
/*
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 8ade165..745cbc5 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -31,6 +31,7 @@
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
+#include "utils/lsyscache.h"
#include "utils/snapmgr.h"
static void _bt_cachemetadata(Relation rel, BTMetaPageData *input);
@@ -773,6 +774,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX
*/
/* XLOG stuff */
+ xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
xlrec_reuse.node = rel->rd_node;
xlrec_reuse.block = blkno;
xlrec_reuse.latestRemovedXid = latestRemovedXid;
@@ -1140,6 +1142,7 @@ _bt_delitems_delete(Relation rel, Buffer buf,
XLogRecPtr recptr;
xl_btree_delete xlrec_delete;
+ xlrec_delete.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
xlrec_delete.latestRemovedXid = latestRemovedXid;
xlrec_delete.nitems = nitems;
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 0a85d8b..2617d55 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -526,7 +526,8 @@ btree_xlog_delete(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+ ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->onCatalogTable, rnode);
}
/*
@@ -810,6 +811,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
if (InHotStandby)
{
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+ xlrec->onCatalogTable,
xlrec->node);
}
}
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index b9311ce..ef4910f 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -27,6 +27,7 @@
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
/* Entry in pending-list of TIDs we need to revisit */
@@ -502,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
OffsetNumber itemnos[MaxIndexTuplesPerPage];
spgxlogVacuumRedirect xlrec;
+ xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
xlrec.nToPlaceholder = 0;
xlrec.newestRedirectXid = InvalidTransactionId;
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index ebe6ae8..800609c 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+ xldata->onCatalogTable,
node);
}
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6e5bc12..e8b7af4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -94,6 +94,7 @@ CheckLogicalDecodingRequirements(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires a database connection")));
+#ifdef NOT_ANYMORE
/* ----
* TODO: We got to change that someday soon...
*
@@ -111,6 +112,7 @@ CheckLogicalDecodingRequirements(void)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("logical decoding cannot be used while in recovery")));
+#endif
}
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 006446b..5785d2f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1064,6 +1064,85 @@ ReplicationSlotReserveWal(void)
}
}
+void
+ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid)
+{
+ int i;
+ bool found_conflict = false;
+
+ if (max_replication_slots <= 0)
+ return;
+
+restart:
+ if (found_conflict)
+ {
+ CHECK_FOR_INTERRUPTS();
+ /*
+ * Wait awhile for them to die so that we avoid flooding an
+ * unresponsive backend when system is heavily loaded.
+ */
+ pg_usleep(100000);
+ found_conflict = false;
+ }
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s;
+ NameData slotname;
+ TransactionId slot_xmin;
+ TransactionId slot_catalog_xmin;
+
+ s = &ReplicationSlotCtl->replication_slots[i];
+
+ /* cannot change while ReplicationSlotCtlLock is held */
+ if (!s->in_use)
+ continue;
+
+ /* not our database, skip */
+ if (s->data.database != InvalidOid && s->data.database != dboid)
+ continue;
+
+ SpinLockAcquire(&s->mutex);
+ slotname = s->data.name;
+ slot_xmin = s->data.xmin;
+ slot_catalog_xmin = s->data.catalog_xmin;
+ SpinLockRelease(&s->mutex);
+
+ if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid))
+ {
+ found_conflict = true;
+
+ ereport(WARNING,
+ (errmsg("slot %s w/ xmin %u conflicts with removed xid %u",
+ NameStr(slotname), slot_xmin, xid)));
+ }
+
+ if (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+ {
+ found_conflict = true;
+
+ ereport(WARNING,
+ (errmsg("slot %s w/ catalog xmin %u conflicts with removed xid %u",
+ NameStr(slotname), slot_catalog_xmin, xid)));
+ }
+
+
+ if (found_conflict)
+ {
+ elog(WARNING, "Dropping conflicting slot %s", s->data.name.data);
+ LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
+ ReplicationSlotDropPtr(s);
+
+ /* We released the lock above; so re-scan the slots. */
+ goto restart;
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+}
+
+
/*
* Flush all replication slots to disk.
*
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 215f146..75dbdb9 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -23,6 +23,7 @@
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -291,7 +292,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
}
void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+ bool onCatalogTable, RelFileNode node)
{
VirtualTransactionId *backends;
@@ -312,6 +314,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
ResolveRecoveryConflictWithVirtualXIDs(backends,
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
+
+ if (onCatalogTable)
+ ResolveRecoveryConflictWithSlots(node.dbNode, latestRemovedXid);
}
void
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index b4f2d0f..f4da4bc 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -18,7 +18,9 @@
#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
+#include "access/table.h"
#include "bootstrap/bootstrap.h"
+#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_am.h"
#include "catalog/pg_amop.h"
@@ -1893,6 +1895,20 @@ get_rel_persistence(Oid relid)
return result;
}
+bool
+get_rel_logical_catalog(Oid relid)
+{
+ bool res;
+ Relation rel;
+
+ /* assume previously locked */
+ rel = heap_open(relid, NoLock);
+ res = RelationIsAccessibleInLogicalDecoding(rel);
+ heap_close(rel, NoLock);
+
+ return res;
+}
+
/* ---------- TRANSFORM CACHE ---------- */
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 9990d97..887a377 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -47,10 +47,10 @@ typedef struct gistxlogPageUpdate
*/
typedef struct gistxlogDelete
{
+ bool onCatalogTable;
RelFileNode hnode; /* RelFileNode of the heap the index currently
* points at */
uint16 ntodelete; /* number of deleted offsets */
-
/*
* In payload of blk 0 : todelete OffsetNumbers
*/
@@ -95,6 +95,7 @@ typedef struct gistxlogPageDelete
*/
typedef struct gistxlogPageReuse
{
+ bool onCatalogTable;
RelFileNode node;
BlockNumber block;
TransactionId latestRemovedXid;
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 53b682c..fd70b55 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -263,6 +263,7 @@ typedef struct xl_hash_init_bitmap_page
*/
typedef struct xl_hash_vacuum_one_page
{
+ bool onCatalogTable;
TransactionId latestRemovedXid;
int ntuples;
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 22cd13c..482c874 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -237,6 +237,7 @@ typedef struct xl_heap_update
*/
typedef struct xl_heap_clean
{
+ bool onCatalogTable;
TransactionId latestRemovedXid;
uint16 nredirected;
uint16 ndead;
@@ -252,6 +253,7 @@ typedef struct xl_heap_clean
*/
typedef struct xl_heap_cleanup_info
{
+ bool onCatalogTable;
RelFileNode node;
TransactionId latestRemovedXid;
} xl_heap_cleanup_info;
@@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple
*/
typedef struct xl_heap_freeze_page
{
+ bool onCatalogTable;
TransactionId cutoff_xid;
uint16 ntuples;
} xl_heap_freeze_page;
@@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page
*/
typedef struct xl_heap_visible
{
+ bool onCatalogTable;
TransactionId cutoff_xid;
uint8 flags;
} xl_heap_visible;
@@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
extern const char *heap2_identify(uint8 info);
extern void heap_xlog_logical_rewrite(XLogReaderState *r);
-extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
+extern XLogRecPtr log_heap_cleanup_info(Relation rel,
TransactionId latestRemovedXid);
extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
OffsetNumber *redirected, int nredirected,
@@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
bool *totally_frozen);
extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
xl_heap_freeze_tuple *xlrec_tp);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags);
#endif /* HEAPAM_XLOG_H */
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 9beccc8..f64a33c 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -126,6 +126,7 @@ typedef struct xl_btree_split
*/
typedef struct xl_btree_delete
{
+ bool onCatalogTable;
TransactionId latestRemovedXid;
int nitems;
@@ -139,6 +140,7 @@ typedef struct xl_btree_delete
*/
typedef struct xl_btree_reuse_page
{
+ bool onCatalogTable;
RelFileNode node;
BlockNumber block;
TransactionId latestRemovedXid;
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index ee8fc6f..d535441 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot
typedef struct spgxlogVacuumRedirect
{
+ bool onCatalogTable;
uint16 nToPlaceholder; /* number of redirects to make placeholders */
OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */
TransactionId newestRedirectXid; /* newest XID of removed redirects */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8f1d66..4e0776a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -205,4 +205,6 @@ extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void);
+extern void ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid);
+
#endif /* SLOT_H */
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2361243..f276c7e 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void);
extern void ShutdownRecoveryTransactionEnvironment(void);
extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
- RelFileNode node);
+ bool catalogTable, RelFileNode node);
extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 9606d02..78bc639 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -131,6 +131,7 @@ extern char get_rel_relkind(Oid relid);
extern bool get_rel_relispartition(Oid relid);
extern Oid get_rel_tablespace(Oid relid);
extern char get_rel_persistence(Oid relid);
+extern bool get_rel_logical_catalog(Oid relid);
extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes);
extern bool get_typisdefined(Oid typid);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 89a7fbf..c36e228 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -16,6 +16,7 @@
#include "access/tupdesc.h"
#include "access/xlog.h"
+#include "catalog/catalog.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h"
#include "catalog/pg_publication.h"
diff --git a/src/test/recovery/t/016_logical_decoding_on_replica.pl b/src/test/recovery/t/016_logical_decoding_on_replica.pl
new file mode 100644
index 0000000..9ee79b0
--- /dev/null
+++ b/src/test/recovery/t/016_logical_decoding_on_replica.pl
@@ -0,0 +1,391 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Test logical decoding on a standby.
+#
+use strict;
+use warnings;
+use 5.8.0;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 51;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $return);
+my $backup_name;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]);
+$backup_name = 'b1';
+my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
+TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--slot=decoding_standby');
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state
+sub wait_for_phys_mins
+{
+ my ($node, $slotname, $check_expr) = @_;
+
+ $node->poll_query_until(
+ 'postgres', qq[
+ SELECT $check_expr
+ FROM pg_catalog.pg_replication_slots
+ WHERE slot_name = '$slotname';
+ ]) or die "Timed out waiting for slot xmins to advance";
+
+ my $slotinfo = $node->slot($slotname);
+ return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'});
+}
+
+sub print_phys_xmin
+{
+ my $slot = $node_master->slot('decoding_standby');
+ return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+my ($xmin, $catalog_xmin) = print_phys_xmin();
+# After slot creation, xmins must be null
+is($xmin, '', "xmin null");
+is($catalog_xmin, '', "catalog_xmin null");
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+ $node_master, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_replica->append_conf('postgresql.conf',
+ q[primary_slot_name = 'decoding_standby']);
+
+$node_replica->start;
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+# with hot_standby_feedback off, xmin and catalog_xmin must still be null
+($xmin, $catalog_xmin) = print_phys_xmin();
+is($xmin, '', "xmin null after replica join");
+is($catalog_xmin, '', "catalog_xmin null after replica join");
+
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby',
+ "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+# Create new slots on the replica, ignoring the ones on the master completely.
+#
+# This must succeed since we know we have a catalog_xmin reservation. We
+# might've already sent hot standby feedback to advance our physical slot's
+# catalog_xmin but not received the corresponding xlog for the catalog xmin
+# advance, in which case we'll create a slot that isn't usable. The calling
+# application can prevent this by creating a temporary slot on the master to
+# lock in its catalog_xmin. For a truly race-free solution we'd need
+# master-to-standby hot_standby_feedback replies.
+#
+# In this case it won't race because there's no concurrent activity on the
+# master.
+#
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+ 0, 'logical slot creation on standby succeeded')
+ or BAIL_OUT('cannot continue if slot creation fails, see logs');
+
+sub print_logical_xmin
+{
+ my $slot = $node_replica->slot('standby_logical');
+ return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+($xmin, $catalog_xmin) = print_logical_xmin();
+is($xmin, '', "logical xmin null");
+isnt($catalog_xmin, '', "logical catalog_xmin not null");
+
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]);
+$node_master->safe_psql('testdb', 'DROP TABLE test_table');
+$node_master->safe_psql('testdb', 'VACUUM');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+# Should show the inserts even when the table is dropped on master
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($stderr, '', 'stderr is empty');
+is($ret, 0, 'replay from slot succeeded')
+ or BAIL_OUT('cannot continue if slot replay fails');
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin();
+isnt($physical_xmin, '', "physical xmin not null");
+isnt($physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin();
+is($logical_xmin, '', "logical xmin null");
+isnt($logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+# Ok, do a pile of tx's and make sure xmin advances.
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]);
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+for my $i (0 .. 2000)
+{
+ $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]);
+$node_master->safe_psql('testdb', 'VACUUM');
+
+my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes");
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+isnt($stdout, '', 'replayed some rows');
+
+($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+is($new_logical_xmin, '', "logical xmin null");
+isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null");
+cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+isnt($new_physical_xmin, '', "physical xmin not null");
+# hot standby feedback should advance phys catalog_xmin now that the standby's
+# slot doesn't hold it down as far.
+isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null");
+cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced");
+
+cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on');
+
+#########################################################
+# Upstream oldestXid retention
+#########################################################
+
+sub test_oldest_xid_retention()
+{
+ # First burn some xids on the master in another DB, so we push the master's
+ # nextXid ahead.
+ foreach my $i (1 .. 100)
+ {
+ $node_master->safe_psql('postgres', 'SELECT txid_current()');
+ }
+
+ # Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
+ # past our needed xmin. The only way we have visibility into that is to force
+ # a checkpoint.
+ $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
+ foreach my $dbname ('template1', 'postgres', 'testdb', 'template0')
+ {
+ $node_master->safe_psql($dbname, 'VACUUM FREEZE');
+ }
+ sleep(1);
+ $node_master->safe_psql('postgres', 'CHECKPOINT');
+ IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
+ or die "pg_controldata failed with $?";
+ my @checkpoint = split('\n', $stdout);
+ my ($oldestXid, $nextXid) = ('', '', '');
+ foreach my $line (@checkpoint)
+ {
+ if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/)
+ {
+ $nextXid = $1;
+ }
+ if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
+ {
+ $oldestXid = $1;
+ }
+ }
+ die 'no oldestXID found in checkpoint' unless $oldestXid;
+
+ my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+ my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+
+ print "upstream oldestXid $oldestXid, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin";
+
+ $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
+
+ return ($oldestXid);
+}
+
+my ($oldestXid) = test_oldest_xid_retention();
+
+cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
+
+########################################################################
+# Recovery conflict: conflicting replication slot should get dropped
+########################################################################
+
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on slave.
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_replica->restart;
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. Both should be NULL since hs_feedback is off
+($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby',
+ "xmin IS NULL AND catalog_xmin IS NULL");
+$node_master->safe_psql('testdb', 'VACUUM FULL');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+isnt($ret, 0, 'usage of slot failed as expected');
+like($stderr, qr/does not exist/, 'slot not found as expected');
+
+# Re-create the slot now that we know it is dropped
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+ 0, 'logical slot creation on standby succeeded')
+ or BAIL_OUT('cannot continue if slot creation fails, see logs');
+
+# Set hot_standby_feedback back on
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. Both should be non-NULL since hs_feedback is on and
+# there is a logical slot present on standby.
+($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby',
+ "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+
+##################################################
+# Drop slot
+##################################################
+#
+is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on');
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+
+# Make sure slots on replicas are droppable, and properly clear the upstream's xmin
+$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]);
+
+is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually');
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. catalog_xmin should become NULL because we dropped
+# the logical slot.
+($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby',
+ "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery: drop database drops idle slots
+##################################################
+
+# Create a couple of slots on the DB to ensure they are dropped when we drop
+# the DB on the upstream if they're on the right DB, or not dropped if on
+# another DB.
+
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot');
+$node_replica->command_ok(['pg_recvlogical', '-v', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot');
+
+is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+ 'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped');
+
+
+##################################################
+# Recovery: drop database drops in-use slots
+##################################################
+
+# This time, have the slot in-use on the downstream DB when we drop it.
+print "Testing dropdb when downstream slot is in-use";
+$node_master->psql('postgres', q[CREATE DATABASE testdb2]);
+
+print "creating slot dodropslot2";
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot'],
+ 'pg_recvlogical created slot test_decoding');
+is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created');
+
+# make sure the slot is in use
+print "starting pg_recvlogical";
+$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+sleep(1);
+
+is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active')
+ or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'");
+
+# Master doesn't know the replica's slot is busy so dropdb should succeed
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]);
+ok(1, 'dropdb finished');
+
+while ($node_replica->slot('dodropslot2')->{'active_pid'})
+{
+ sleep(1);
+ print "waiting for walsender to exit";
+}
+
+print "walsender exited, waiting for pg_recvlogical to exit";
+
+# our client should've terminated in response to the walsender error
+eval {
+ $handle->finish;
+};
+$return = $?;
+if ($return) {
+ is($return, 256, "pg_recvlogical terminated by server");
+ like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict');
+ like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db');
+}
+
+is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited');
+
+# The slot should be dropped by recovery now
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f',
+ 'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
--
2.1.4