Hi,

On 4/7/23 9:50 AM, Andres Freund wrote:
Hi,
Here's my current working state - I'll go to bed soon.

Thanks a lot for this Andres!


Changes:

- shared catalog relations weren't handled correctly, because the dboid is
   InvalidOid for them. I wrote a test for that as well.

- ReplicationSlotsComputeRequiredXmin() took invalidated logical slots into
   account (ReplicationSlotsComputeLogicalRestartLSN() too, but it never looks
   at logical slots)

- I don't think the subset of slot xids that were checked when invalidating
   was right. We need to check effective_xmin and effective_catalog_xmin - the
   latter was using catalog_xmin.

- similarly, it wasn't right that specifically those two fields were
   overwritten when invalidated - as that was done, I suspect the changes might
   get lost on a restart...

- As mentioned previously, I did not like all the functions in slot.h, nor
   their naming. Not yet quite finished with that, but a good bit further

- There were a lot of unrelated changes, e.g. removing comments like
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.

- I still don't like the order of the patches, fixing the walsender patches
   after introducing support for logical decoding on standby. Reordered.

- I don't think logical slots being invalidated as checked e.g. in
   pg_logical_replication_slot_advance()

- I didn't like much that InvalidatePossiblyObsoleteSlot() switched between
   kill() and SendProcSignal() based on the "conflict". There very well could
   be reasons to use InvalidatePossiblyObsoleteSlot() with an xid from outside
   of the startup process in the future. Instead I made it differentiate based
   on MyBackendType == B_STARTUP.


Thanks for all of this and the above explanations.


I also:

Added new patch that replaces invalidated_at with a new enum, 'invalidated',
listing the reason for the invalidation.

Yeah, that's a great idea.

I added a check for !invalidated to
ReplicationSlotsComputeRequiredLSN() etc.


looked at 65-0001 and it looks good to me.

Added new patch moving checks for invalid logical slots into
CreateDecodingContext(). Otherwise we end up with 5 or so checks, which makes
no sense. As far as I can tell the old message in
pg_logical_slot_get_changes_guts() was bogus, one couldn't get there having
"never previously reserved WAL"


looked at 65-0002 and it looks good to me.

Split "Handle logical slot conflicts on standby." into two. I'm not sure that
should stay that way, but it made it easier to hack on
InvalidateObsoleteReplicationSlots.


looked at 65-0003 and the others.

It's easier to understand/read the code now that the 
ReplicationSlotInvalidationCause
enum has been created and that data.invalidated also make use of the enum. It does 
"simplify"
the review and that looks good to me.


Todo:
- write a test that invalidated logical slots stay invalidated across a restart

Done in 65-66-0008 attached.

- write a test that invalidated logical slots do not lead to retaining WAL

I'm not sure how to do that since pg_switch_wal() and friends can't be executed 
on
a standby.

- Further evolve the API of InvalidateObsoleteReplicationSlots()
   - pass in the ReplicationSlotInvalidationCause we're trying to conflict on?
   - rename xid to snapshotConflictHorizon, that'd be more in line with the
     ResolveRecoveryConflictWithSnapshot and easier to understand, I think


Done. The new API can be found in 
v65-66-InvalidateObsoleteReplicationSlots_API.patch
attached. It propagates the cause to InvalidatePossiblyObsoleteSlot() where a 
switch/case
can now be used. The "default" case does not emit an error since this code runs 
as part
of checkpoint.

- The test could stand a bit of cleanup and consolidation
   - No need to start 4 psql processes to do 4 updates, just do it in one
     safe_psql()

Right, done in v65-66-0008-New-TAP-test-for-logical-decoding-on-standby.patch 
attached.

   - the sequence of drop_logical_slots(), create_logical_slots(),
     change_hot_standby_feedback_and_wait_for_xmins(), make_slot_active() is
     repeated quite a few times

grouped in reactive_slots_change_hfs_and_wait_for_xmins() in 65-66-0008 
attached.

   - the stats queries checking for specific conflict counts, including
     preceding tests, is pretty painful. I suggest to reset the stats at the
     end of the test instead (likely also do the drop_logical_slot() there).

Good idea, done in 65-66-0008 attached.

   - it's hard to correlate postgres log and the tap test, because the slots
     are named the same across all tests. Perhaps they could have a per-test
     prefix?

Good point. Done in 65-66-0008 attached. Thanks to that and the stats reset the
check for invalidation is now done in a single function 
"check_for_invalidation" that looks
for invalidation messages in the logfile and in pg_stat_database_conflicts.

Thanks for the suggestions: the TAP test is now easier to read/understand.

   - numbering tests is a PITA, I had to renumber the later ones, when adding a
     test for shared catalog tables

Yeah, sorry about that, it has been fixed in V63.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index a4153518fd..8dd8d40c07 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6818,7 +6818,7 @@ CreateCheckPoint(int flags)
        XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
        KeepLogSeg(recptr, &_logSegNo);
        if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid,
-                                                                               
   InvalidTransactionId))
+                                                                               
   InvalidTransactionId, RS_INVAL_WAL))
        {
                /*
                 * Some slots have been invalidated; recalculate the old-segment
@@ -7263,7 +7263,7 @@ CreateRestartPoint(int flags)
        endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
        KeepLogSeg(endptr, &_logSegNo);
        if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid,
-                                                                               
   InvalidTransactionId))
+                                                                               
   InvalidTransactionId, RS_INVAL_WAL))
        {
                /*
                 * Some slots have been invalidated; recalculate the old-segment
@@ -7988,7 +7988,8 @@ xlog_redo(XLogReaderState *record)
                        xlrec.wal_level < WAL_LEVEL_LOGICAL &&
                        wal_level >= WAL_LEVEL_LOGICAL)
                        InvalidateObsoleteReplicationSlots(0, InvalidOid,
-                                                                               
           InvalidTransactionId);
+                                                                               
           InvalidTransactionId,
+                                                                               
           RS_INVAL_WAL_LEVEL);
 
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->MaxConnections = xlrec.MaxConnections;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 411f6c0149..3c527742de 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1322,12 +1322,13 @@ LogicalReplicationSlotXidsConflict(ReplicationSlot *s, 
Oid dboid, TransactionId
  */
 static bool
 InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
-                                                          Oid dboid, 
TransactionId xid, bool *invalidated)
+                                                          Oid dboid, 
TransactionId xid, bool *invalidated,
+                                                          
ReplicationSlotInvalidationCause cause)
 {
        int                     last_signaled_pid = 0;
        bool            released_lock = false;
        bool            invalidate_all_logical = !TransactionIdIsValid(xid) &&
-               oldestLSN == InvalidXLogRecPtr;
+       oldestLSN == InvalidXLogRecPtr;
 
 
        for (;;)
@@ -1363,14 +1364,30 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                 */
                if (s->data.invalidated == RS_INVAL_NONE)
                {
-                       if (oldestLSN != InvalidXLogRecPtr && 
s->data.restart_lsn != InvalidXLogRecPtr &&
-                               s->data.restart_lsn < oldestLSN)
-                               conflict = RS_INVAL_WAL;
-                       if (TransactionIdIsValid(xid) && SlotIsLogical(s) &&
-                               LogicalReplicationSlotXidsConflict(s, dboid, 
xid))
-                               conflict = RS_INVAL_XID;
-                       else if (invalidate_all_logical && SlotIsLogical(s))
-                               conflict = RS_INVAL_WAL_LEVEL;
+                       switch (cause)
+                       {
+                               case RS_INVAL_WAL:
+                                       if (oldestLSN != InvalidXLogRecPtr && 
s->data.restart_lsn != InvalidXLogRecPtr &&
+                                               s->data.restart_lsn < oldestLSN)
+                                               conflict = cause;
+                                       break;
+                               case RS_INVAL_XID:
+                                       if (TransactionIdIsValid(xid) && 
SlotIsLogical(s) &&
+                                               
LogicalReplicationSlotXidsConflict(s, dboid, xid))
+                                               conflict = cause;
+                                       break;
+                               case RS_INVAL_WAL_LEVEL:
+                                       if (invalidate_all_logical && 
SlotIsLogical(s))
+                                               conflict = cause;
+                                       break;
+                               default:
+                                       break;
+
+                                       /*
+                                        * this runs as part of checkpoint, so 
avoid raising
+                                        * errors if possible
+                                        */
+                       }
                }
 
                /* if there's no conflict, we're done */
@@ -1511,16 +1528,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
  *      be invalidated. Note that WaitExceedsMaxStandbyDelay() is not taken 
into
  *      account here (as opposed to ResolveRecoveryConflictWithVirtualXIDs()): 
XXXX
  *
- *
- * XXX: Should we have the caller pass in a specific
- * ReplicationSlotInvalidationCause that we should search for? That'd likely
- * make some things a bit neater.
- *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
 InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid,
-                                                                  
TransactionId xid)
+                                                                  
TransactionId snapshotConflictHorizon,
+                                                                  
ReplicationSlotInvalidationCause cause)
 {
        XLogRecPtr      oldestLSN;
        bool            invalidated = false;
@@ -1539,7 +1552,9 @@ restart:
                if (!s->in_use)
                        continue;
 
-               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, dboid, xid, 
&invalidated))
+               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, dboid,
+                                                                               
   snapshotConflictHorizon, &invalidated,
+                                                                               
   cause))
                {
                        /* if the lock was released, start from scratch */
                        goto restart;
diff --git a/src/backend/storage/ipc/standby.c 
b/src/backend/storage/ipc/standby.c
index ce5842b0db..77166127f2 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -496,7 +496,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
 
        if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
                InvalidateObsoleteReplicationSlots(0, locator.dbOid,
-                                                                               
   snapshotConflictHorizon);
+                                                                               
   snapshotConflictHorizon, RS_INVAL_XID);
 }
 
 /*
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 35cafe94bc..1430c7e908 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -231,7 +231,8 @@ extern XLogRecPtr 
ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid 
dboid,
-                                                                               
           TransactionId xid);
+                                                                               
           TransactionId snapshotConflictHorizon,
+                                                                               
           ReplicationSlotInvalidationCause cause);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool 
need_lock);
 extern int     ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl 
b/src/test/recovery/t/035_standby_logical_decoding.pl
index e2c46a6bf6..561dcd33c3 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -51,16 +51,23 @@ sub wait_for_xmins
 # Create the required logical slots on standby.
 sub create_logical_slots
 {
-       my ($node) = @_;
-       $node->create_logical_slot_on_standby($node_primary, 'inactiveslot', 
'testdb');
-       $node->create_logical_slot_on_standby($node_primary, 'activeslot', 
'testdb');
+       my ($node, $slot_prefix) = @_;
+
+       my $active_slot = $slot_prefix . 'activeslot';
+       my $inactive_slot = $slot_prefix . 'inactiveslot';
+       $node->create_logical_slot_on_standby($node_primary, 
qq($inactive_slot), 'testdb');
+       $node->create_logical_slot_on_standby($node_primary, qq($active_slot), 
'testdb');
 }
 
 # Drop the logical slots on standby.
 sub drop_logical_slots
 {
-       $node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
-       $node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+       my ($slot_prefix) = @_;
+       my $active_slot = $slot_prefix . 'activeslot';
+       my $inactive_slot = $slot_prefix . 'inactiveslot';
+
+       $node_standby->psql('postgres', qq[SELECT 
pg_drop_replication_slot('$inactive_slot')]);
+       $node_standby->psql('postgres', qq[SELECT 
pg_drop_replication_slot('$active_slot')]);
 }
 
 # Acquire one of the standby logical slots created by create_logical_slots().
@@ -68,16 +75,17 @@ sub drop_logical_slots
 # If wait is not true it means we are testing a known failure scenario.
 sub make_slot_active
 {
-       my ($node, $wait, $to_stdout, $to_stderr) = @_;
+       my ($node, $slot_prefix, $wait, $to_stdout, $to_stderr) = @_;
        my $slot_user_handle;
 
-       $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', 
$node->connstr('testdb'), '-S', 'activeslot', '-o', 'include-xids=0', '-o', 
'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, 
'2>', $to_stderr);
+       my $active_slot = $slot_prefix . 'activeslot';
+       $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', 
$node->connstr('testdb'), '-S', qq($active_slot), '-o', 'include-xids=0', '-o', 
'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, 
'2>', $to_stderr);
 
        if ($wait)
        {
                # make sure activeslot is in use
                $node->poll_query_until('testdb',
-                       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots 
WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+                       qq[SELECT EXISTS (SELECT 1 FROM pg_replication_slots 
WHERE slot_name = '$active_slot' AND active_pid IS NOT NULL)]
                ) or die "slot never became active";
        }
        return $slot_user_handle;
@@ -104,10 +112,10 @@ sub check_pg_recvlogical_stderr
 # that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
 sub check_slots_dropped
 {
-       my ($slot_user_handle) = @_;
+       my ($slot_prefix, $slot_user_handle) = @_;
 
-       is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 
'inactiveslot on standby dropped');
-       is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on 
standby dropped');
+       is($node_standby->slot($slot_prefix . 'inactiveslot')->{'slot_type'}, 
'', 'inactiveslot on standby dropped');
+       is($node_standby->slot($slot_prefix . 'activeslot')->{'slot_type'}, '', 
'activeslot on standby dropped');
 
        check_pg_recvlogical_stderr($slot_user_handle, "conflict with 
recovery");
 }
@@ -170,6 +178,51 @@ sub check_slots_conflicting_status
        }
 }
 
+# Drop the slots, re-create them, change hot_standby_feedback,
+# check xmin and catalog_xmin values, make slot active and reset stat.
+sub reactive_slots_change_hfs_and_wait_for_xmins
+{
+       my ($previous_slot_prefix, $slot_prefix, $hsf, $invalidated) = @_;
+
+       # drop the logical slots
+       drop_logical_slots($previous_slot_prefix);
+
+       # create the logical slots
+       create_logical_slots($node_standby, $slot_prefix);
+
+       change_hot_standby_feedback_and_wait_for_xmins($hsf, $invalidated);
+
+       $handle = make_slot_active($node_standby, $slot_prefix, 1, \$stdout, 
\$stderr);
+
+       # reset stat: easier to check for confl_active_logicalslot in 
pg_stat_database_conflicts
+       $node_standby->psql('testdb', q[select pg_stat_reset();]);
+}
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+sub check_for_invalidation
+{
+       my ($slot_prefix, $log_start, $test_name) = @_;
+
+       my $active_slot = $slot_prefix . 'activeslot';
+       my $inactive_slot = $slot_prefix . 'inactiveslot';
+
+       # message should be issued
+       ok( find_in_log(
+               $node_standby,
+               "invalidating obsolete replication slot \"$inactive_slot\"", 
$log_start),
+               "inactiveslot slot invalidation is logged $test_name");
+
+       ok( find_in_log(
+               $node_standby,
+               "invalidating obsolete replication slot \"$active_slot\"", 
$log_start),
+               "activeslot slot invalidation is logged $test_name");
+
+       # Verify that pg_stat_database_conflicts.confl_active_logicalslot has 
been updated
+       ok( $node_standby->poll_query_until(
+               'postgres',
+               "select (confl_active_logicalslot = 1) from 
pg_stat_database_conflicts where datname = 'testdb'", 't'),
+               'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+}
 ########################
 # Initialize primary node
 ########################
@@ -233,7 +286,7 @@ 
$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
 ##################################################
 
 # create the logical slots
-create_logical_slots($node_standby);
+create_logical_slots($node_standby, 'behaves_ok_');
 
 $node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
 $node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, 
s::text FROM generate_series(1,10) s;]);
@@ -241,7 +294,7 @@ $node_primary->safe_psql('testdb', qq[INSERT INTO 
decoding_test(x,y) SELECT s, s
 $node_primary->wait_for_replay_catchup($node_standby);
 
 my $result = $node_standby->safe_psql('testdb',
-       qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+       qq[SELECT pg_logical_slot_get_changes('behaves_ok_activeslot', NULL, 
NULL);]);
 
 # test if basic decoding works
 is(scalar(my @foobar = split /^/m, $result),
@@ -263,13 +316,13 @@ COMMIT};
 $node_primary->wait_for_replay_catchup($node_standby);
 
 my $stdout_sql = $node_standby->safe_psql('testdb',
-       qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+       qq[SELECT data FROM 
pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');]
 );
 
 is($stdout_sql, $expected, 'got expected output from SQL decoding session');
 
 my $endpos = $node_standby->safe_psql('testdb',
-       "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) 
ORDER BY lsn DESC LIMIT 1;"
+       "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', 
NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
 );
 
 # Insert some rows after $endpos, which we won't read.
@@ -280,7 +333,7 @@ $node_primary->safe_psql('testdb',
 $node_primary->wait_for_catchup($node_standby);
 
 my $stdout_recv = $node_standby->pg_recvlogical_upto(
-    'testdb', 'activeslot', $endpos, $default_timeout,
+    'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
     'include-xids'     => '0',
     'skip-empty-xacts' => '1');
 chomp($stdout_recv);
@@ -288,11 +341,11 @@ is($stdout_recv, $expected,
     'got same expected output from pg_recvlogical decoding session');
 
 $node_standby->poll_query_until('testdb',
-       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 
'activeslot' AND active_pid IS NULL)"
+       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 
'behaves_ok_activeslot' AND active_pid IS NULL)"
 ) or die "slot never became inactive";
 
 $stdout_recv = $node_standby->pg_recvlogical_upto(
-    'testdb', 'activeslot', $endpos, $default_timeout,
+    'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
     'include-xids'     => '0',
     'skip-empty-xacts' => '1');
 chomp($stdout_recv);
@@ -302,28 +355,20 @@ $node_primary->safe_psql('postgres', 'CREATE DATABASE 
otherdb');
 
 is( $node_primary->psql(
         'otherdb',
-        "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL) ORDER BY lsn DESC LIMIT 1;"
+        "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', 
NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
     ),
     3,
     'replaying logical slot from another database fails');
 
-# drop the logical slots
-drop_logical_slots();
-
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
 # Scenario 1: hot_standby_feedback off and vacuum FULL
 ##################################################
 
-# create the logical slots
-create_logical_slots($node_standby);
-
 # One way to produce recovery conflict is to create/drop a relation and
 # launch a vacuum full on pg_class with hot_standby_feedback turned off on
 # the standby.
-change_hot_standby_feedback_and_wait_for_xmins(0,1);
-
-$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+reactive_slots_change_hfs_and_wait_for_xmins('behaves_ok_', 'vacuum_full_', 0, 
1);
 
 # This should trigger the conflict
 $node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
@@ -332,34 +377,33 @@ $node_primary->safe_psql('testdb', 'VACUUM full 
pg_class;');
 
 $node_primary->wait_for_replay_catchup($node_standby);
 
-# message should be issued
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"inactiveslot\""),
-  'inactiveslot slot invalidation is logged with vacuum FULL on pg_class');
-
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"activeslot\""),
-  'activeslot slot invalidation is logged with vacuum FULL on pg_class');
-
-# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
-ok( $node_standby->poll_query_until(
-       'postgres',
-       "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
-       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
 
 # Verify slots are reported as conflicting in pg_replication_slots
 check_slots_conflicting_status(1);
 
-$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+$handle = make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, 
\$stderr);
 
 # We are not able to read from the slot as it has been invalidated
-check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"vacuum_full_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1,1);
 
+##################################################
+# Verify that invalidated logical slots stay invalidated across a restart.
+##################################################
+$node_standby->restart;
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+##################################################
+# Verify that invalidated logical slots do not lead to retaining WAL
+##################################################
+# XXXXX TODO
+
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
 # Scenario 2: conflict due to row removal with hot_standby_feedback off.
@@ -368,17 +412,9 @@ change_hot_standby_feedback_and_wait_for_xmins(1,1);
 # get the position to search from in the standby logfile
 my $logstart = -s $node_standby->logfile;
 
-# drop the logical slots
-drop_logical_slots();
-
-# create the logical slots
-create_logical_slots($node_standby);
-
 # One way to produce recovery conflict is to create/drop a relation and
 # launch a vacuum on pg_class with hot_standby_feedback turned off on the 
standby.
-change_hot_standby_feedback_and_wait_for_xmins(0,1);
-
-$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+reactive_slots_change_hfs_and_wait_for_xmins('vacuum_full_', 'row_removal_', 
0, 1);
 
 # This should trigger the conflict
 $node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
@@ -387,31 +423,16 @@ $node_primary->safe_psql('testdb', 'VACUUM pg_class;');
 
 $node_primary->wait_for_replay_catchup($node_standby);
 
-# message should be issued
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
-  'inactiveslot slot invalidation is logged with vacuum on pg_class');
-
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"activeslot\"", $logstart),
-  'activeslot slot invalidation is logged with vacuum on pg_class');
-
-# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
-# we now expect 2 conflicts reported as the counter persist across reloads
-ok( $node_standby->poll_query_until(
-       'postgres',
-       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
-       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
 
 # Verify slots are reported as conflicting in pg_replication_slots
 check_slots_conflicting_status(1);
 
-$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+$handle = make_slot_active($node_standby, 'row_removal_', 0, \$stdout, 
\$stderr);
 
 # We are not able to read from the slot as it has been invalidated
-check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"row_removal_activeslot\"");
 
 ##################################################
 # Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -421,17 +442,9 @@ check_pg_recvlogical_stderr($handle, "cannot read from 
logical replication slot
 # get the position to search from in the standby logfile
 $logstart = -s $node_standby->logfile;
 
-# drop the logical slots
-drop_logical_slots();
-
-# create the logical slots
-create_logical_slots($node_standby);
-
 # One way to produce recovery conflict is to create/drop a relation and
 # launch a vacuum on pg_class with hot_standby_feedback turned off on the 
standby.
-change_hot_standby_feedback_and_wait_for_xmins(0,1);
-
-$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+reactive_slots_change_hfs_and_wait_for_xmins('row_removal_', 
'shared_row_removal_', 0, 1);
 
 # Trigger the conflict. The txid_current() is to ensure there's some WAL
 # record associated with the database, otherwise the wait below does not work
@@ -445,31 +458,16 @@ $node_primary->safe_psql('testdb', 'VACUUM pg_authid;');
 
 $node_primary->wait_for_replay_catchup($node_standby);
 
-# message should be issued
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
-  'inactiveslot slot invalidation is logged with vacuum on pg_authid');
-
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"activeslot\"", $logstart),
-  'activeslot slot invalidation is logged with vacuum on pg_authid');
-
-# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
-# we now expect 2 conflicts reported as the counter persist across reloads
-ok( $node_standby->poll_query_until(
-       'postgres',
-       "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
-       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('shared_row_removal_', $logstart, 'with vacuum on 
pg_authid');
 
 # Verify slots are reported as conflicting in pg_replication_slots
 check_slots_conflicting_status(1);
 
-$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+$handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout, 
\$stderr);
 
 # We are not able to read from the slot as it has been invalidated
-check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"shared_row_removal_activeslot\"");
 
 ##################################################
 # Recovery conflict: Same as Scenario 2 but on a non catalog table
@@ -479,41 +477,30 @@ check_pg_recvlogical_stderr($handle, "cannot read from 
logical replication slot
 # get the position to search from in the standby logfile
 $logstart = -s $node_standby->logfile;
 
-# drop the logical slots
-drop_logical_slots();
-
-# create the logical slots
-create_logical_slots($node_standby);
-
-# put hot standby feedback to off
-change_hot_standby_feedback_and_wait_for_xmins(0,1);
-
-$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+reactive_slots_change_hfs_and_wait_for_xmins('shared_row_removal_', 
'no_conflict_', 0, 1);
 
 # This should not trigger a conflict
-$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
-$node_primary->safe_psql('testdb', qq[INSERT INTO conflict_test(x,y) SELECT s, 
s::text FROM generate_series(1,4) s;]);
-$node_primary->safe_psql('testdb', qq[UPDATE conflict_test set x=1, y=1;]);
-$node_primary->safe_psql('testdb', 'VACUUM conflict_test;');
-
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);
+                                                                         
INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;
+                                                                         
UPDATE conflict_test set x=1, y=1;
+                                                                         
VACUUM conflict_test;]);
 $node_primary->wait_for_replay_catchup($node_standby);
 
 # message should not be issued
 ok( !find_in_log(
    $node_standby,
-  "invalidating obsolete slot \"inactiveslot\"", $logstart),
+  "invalidating obsolete slot \"no_conflict_inactiveslot\"", $logstart),
   'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
 
 ok( !find_in_log(
    $node_standby,
-  "invalidating obsolete slot \"activeslot\"", $logstart),
+  "invalidating obsolete slot \"no_conflict_activeslot\"", $logstart),
   'activeslot slot invalidation is not logged with vacuum on conflict_test');
 
 # Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been 
updated
-# we now still expect 2 conflicts reported as the counter persist across 
reloads
 ok( $node_standby->poll_query_until(
        'postgres',
-       "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       "select (confl_active_logicalslot = 0) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
        'confl_active_logicalslot not updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
 
 # Verify slots are reported as non conflicting in pg_replication_slots
@@ -522,6 +509,9 @@ check_slots_conflicting_status(0);
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 0);
 
+# Restart the standby node to ensure no slots are still active
+$node_standby->restart;
+
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
 # Scenario 4: conflict due to on-access pruning.
@@ -530,17 +520,9 @@ change_hot_standby_feedback_and_wait_for_xmins(1, 0);
 # get the position to search from in the standby logfile
 $logstart = -s $node_standby->logfile;
 
-# drop the logical slots
-drop_logical_slots();
-
-# create the logical slots
-create_logical_slots($node_standby);
-
 # One way to produce recovery conflict is to trigger an on-access pruning
 # on a relation marked as user_catalog_table.
-change_hot_standby_feedback_and_wait_for_xmins(0,0);
-
-$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+reactive_slots_change_hfs_and_wait_for_xmins('no_conflict_', 'pruning_', 0, 0);
 
 # This should trigger the conflict
 $node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s 
char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
@@ -552,31 +534,16 @@ $node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 
'E';]);
 
 $node_primary->wait_for_replay_catchup($node_standby);
 
-# message should be issued
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
-  'inactiveslot slot invalidation is logged with on-access pruning');
-
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"activeslot\"", $logstart),
-  'activeslot slot invalidation is logged with on-access pruning');
-
-# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
-# we now expect 3 conflicts reported as the counter persist across reloads
-ok( $node_standby->poll_query_until(
-       'postgres',
-       "select (confl_active_logicalslot = 4) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
-       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
 
 # Verify slots are reported as conflicting in pg_replication_slots
 check_slots_conflicting_status(1);
 
-$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+$handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
 # We are not able to read from the slot as it has been invalidated
-check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"pruning_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -590,12 +557,15 @@ change_hot_standby_feedback_and_wait_for_xmins(1, 1);
 $logstart = -s $node_standby->logfile;
 
 # drop the logical slots
-drop_logical_slots();
+drop_logical_slots('pruning_');
 
 # create the logical slots
-create_logical_slots($node_standby);
+create_logical_slots($node_standby, 'wal_level_');
+
+$handle = make_slot_active($node_standby, 'wal_level_', 1, \$stdout, \$stderr);
 
-$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+# reset stat: easier to check for confl_active_logicalslot in 
pg_stat_database_conflicts
+$node_standby->psql('testdb', q[select pg_stat_reset();]);
 
 # Make primary wal_level replica. This will trigger slot conflict.
 $node_primary->append_conf('postgresql.conf',q[
@@ -605,28 +575,13 @@ $node_primary->restart;
 
 $node_primary->wait_for_replay_catchup($node_standby);
 
-# message should be issued
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
-  'inactiveslot slot invalidation is logged due to wal_level');
-
-ok( find_in_log(
-   $node_standby,
-  "invalidating obsolete replication slot \"activeslot\"", $logstart),
-  'activeslot slot invalidation is logged due to wal_level');
-
-# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
-# we now expect 5 conflicts reported as the counter persist across reloads
-ok( $node_standby->poll_query_until(
-       'postgres',
-       "select (confl_active_logicalslot = 5) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
-       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
 
 # Verify slots are reported as conflicting in pg_replication_slots
 check_slots_conflicting_status(1);
 
-$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+$handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # We are not able to read from the slot as it requires wal_level at least 
logical on the primary server
 check_pg_recvlogical_stderr($handle, "logical decoding on standby requires 
wal_level to be at least logical on the primary server");
 
@@ -637,21 +592,22 @@ wal_level = 'logical'
 $node_primary->restart;
 $node_primary->wait_for_replay_catchup($node_standby);
 
-$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+$handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # as the slot has been invalidated we should not be able to read
-check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"wal_level_activeslot\"");
 
 ##################################################
 # DROP DATABASE should drops it's slots, including active slots.
 ##################################################
 
 # drop the logical slots
-drop_logical_slots();
+drop_logical_slots('wal_level_');
 
 # create the logical slots
-create_logical_slots($node_standby);
+create_logical_slots($node_standby, 'drop_db_');
+
+$handle = make_slot_active($node_standby, 'drop_db_', 1, \$stdout, \$stderr);
 
-$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
 # Create a slot on a database that would not be dropped. This slot should not
 # get dropped.
 $node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 
'postgres');
@@ -665,7 +621,7 @@ is($node_standby->safe_psql('postgres',
        q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 
'f',
        'database dropped on standby');
 
-check_slots_dropped($handle);
+check_slots_dropped('drop_db', $handle);
 
 is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
        'otherslot on standby not dropped');
@@ -684,14 +640,14 @@ $node_primary->psql('postgres', q[CREATE DATABASE 
testdb]);
 $node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
 
 # create the logical slots
-create_logical_slots($node_standby);
+create_logical_slots($node_standby, 'promotion_');
 
 # create the logical slots on the cascading standby too
-create_logical_slots($node_cascading_standby);
+create_logical_slots($node_cascading_standby, 'promotion_');
 
 # Make slots actives
-$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
-my $cascading_handle = make_slot_active($node_cascading_standby, 1, 
\$cascading_stdout, \$cascading_stderr);
+$handle = make_slot_active($node_standby, 'promotion_', 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 'promotion_', 
1, \$cascading_stdout, \$cascading_stderr);
 
 # Insert some rows before the promotion
 $node_primary->safe_psql('testdb',
@@ -727,7 +683,7 @@ COMMIT};
 
 # check that we are decoding pre and post promotion inserted rows
 $stdout_sql = $node_standby->safe_psql('testdb',
-       qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+       qq[SELECT data FROM 
pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');]
 );
 
 is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
promoted standby');
@@ -746,7 +702,7 @@ is($stdout, $expected,
 
 # check that we are decoding pre and post promotion inserted rows on the 
cascading standby
 $stdout_sql = $node_cascading_standby->safe_psql('testdb',
-       qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+       qq[SELECT data FROM 
pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');]
 );
 
 is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
cascading standby');

Reply via email to