Hi,

On 11/25/22 11:26 AM, Drouvot, Bertrand wrote:
Hi,

On 9/30/22 2:11 PM, Drouvot, Bertrand wrote:
Hi,

On 7/6/22 3:30 PM, Drouvot, Bertrand wrote:
Hi,

On 10/28/21 11:07 PM, Andres Freund wrote:
Hi,

On 2021-10-28 16:24:22 -0400, Robert Haas wrote:
On Wed, Oct 27, 2021 at 2:56 AM Drouvot, Bertrand <bdrou...@amazon.com> wrote:
So you have in mind to check for XLogLogicalInfoActive() first, and if true, 
then open the relation and call
RelationIsAccessibleInLogicalDecoding()?
I think 0001 is utterly unacceptable. We cannot add calls to
table_open() in low-level functions like this. Suppose for example
that _bt_getbuf() calls _bt_log_reuse_page() which with 0001 applied
would call get_rel_logical_catalog(). _bt_getbuf() will have acquired
a buffer lock on the page. The idea that it's safe to call
table_open() while holding a buffer lock cannot be taken seriously.
Yes - that's pretty clearly a deadlock hazard. It shouldn't too hard to fix, I
think. Possibly a bit more verbose than nice, but...

Alternatively we could propagate the information whether a relcache entry is
for a catalog from the table to the index. Then we'd not need to change the
btree code to pass the table down.

Looking closer at RelationIsAccessibleInLogicalDecoding() It seems to me that the 
missing part to be able to tell whether or not an index is for a catalog is the 
rd_options->user_catalog_table value of its related heap relation.

Then, a way to achieve that could be to:

- Add to Relation a new "heap_rd_options" representing the rd_options of the 
related heap relation when appropriate

- Trigger the related indexes relcache invalidations when an 
ATExecSetRelOptions() is triggered on a heap relation

- Write an equivalent of RelationIsUsedAsCatalogTable() for indexes that would 
make use of the heap_rd_options instead

Does that sound like a valid option to you or do you have another idea in mind 
to propagate the information whether a relcache entry is for a catalog from the 
table to the index?


I ended up with the attached proposal to propagate the catalog information to 
the indexes.

The attached adds a new field "isusercatalog" in pg_index to indicate whether 
or not the index is linked to a table that has the storage parameter user_catalog_table 
set to true.

Then it defines new macros, including "IndexIsAccessibleInLogicalDecoding" 
making use of this new field.

This new macro replaces get_rel_logical_catalog() that was part of the previous 
patch version.

What do you think about this approach and the attached?

If that sounds reasonable, then I'll add tap tests for it and try to improve the way 
isusercatalog is propagated to the index(es) in case a reset is done on 
user_catalog_table on the table (currently in this POC patch, it's hardcoded to 
"false" which is the default value for user_catalog_table in boolRelOpts[]) (A 
better approach would be probably to retrieve the value from the table once the reset is 
done and then propagate it to the index(es).)

Please find attached a rebase to propagate the catalog information to the 
indexes.
It also takes care of the RESET on user_catalog_table (adding a new Macro 
"HEAP_DEFAULT_USER_CATALOG_TABLE") and adds a few tests in 
contrib/test_decoding/sql/ddl.sql.

Please find attached a new patch series:

v27-0001-Add-info-in-WAL-records-in-preparation-for-logic.patch
v27-0002-Handle-logical-slot-conflicts-on-standby.patch
v27-0003-Allow-logical-decoding-on-standby.patch
v27-0004-New-TAP-test-for-logical-decoding-on-standby.patch
v27-0005-Doc-changes-describing-details-about-logical-dec.patch
v27-0006-Fixing-Walsender-corner-case-with-logical-decodi.patch

with the previous comments addressed, means mainly:

1/ don't call table_open() in low-level functions in 0001: this is done with a new field 
"isusercatalog" in pg_index to indicate whether or not the index is linked to a table 
that has the storage parameter user_catalog_table set to true (we may want to make this field 
"invisible" though). This new field is then used in the new 
IndexIsAccessibleInLogicalDecoding Macro (through IndexIsUserCatalog).

2/ Renaming the new field generated in the xlog record (to arrange conflict handling) from 
"onCatalogTable" to "onCatalogAccessibleInLogicalDecoding" to avoid any 
confusion (see 0001).

3/ Making sure that "currTLI" is the current one in logical_read_xlog_page() 
(see 0003).

4/ Fixing Walsender/startup process corner case: It's done in 0006 (I thought it is 
better to keep the other patches purely "feature" related and to address this 
corner case separately to ease the review). The fix is making use of a new

condition variable "replayedCV" so that the startup process can broadcast the 
walsender(s) once a replay is done.

Remarks:

- The new confl_active_logicalslot field added in pg_stat_database_conflicts 
(see 0002) is incremented only if the slot being invalidated is active (I think 
it makes more sense in regard to the other fields too). In all the cases 
(active/not active) the slot invalidation is reported in the logfile. The 
documentation update mentions this behavior (see 0002).

- LogStandbySnapshot() being moved outside of the loop in 
ReplicationSlotReserveWal() (see 0003), is a proposal made by Andres in [1] and 
I think it makes sense.

- Tap tests (see 0004) are covering: tests that the logical decoding on standby 
behaves correctly, conflicts, slots invalidations, standby promotion.


Looking forward to your feedback,


[1]: 
https://www.postgresql.org/message-id/20210406180231.qsnkyrgrm7gtxb73%40alap3.anarazel.de

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From d59f72dc62cf2922800a99ac001fb7c9aeaaab72 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Thu, 1 Dec 2022 12:34:59 +0000
Subject: [PATCH v27 6/6] Fixing Walsender corner case with logical decoding on
 standby.

The problem is that WalSndWaitForWal() waits for the *replay* LSN to
increase, but gets woken up by walreceiver when new WAL has been
flushed. Which means that typically walsenders will get woken up at the
same time that the startup process will be - which means that by the
time the logical walsender checks GetXLogReplayRecPtr() it's unlikely
that the startup process already replayed the record and updated
XLogCtl->lastReplayedEndRecPtr.

Introducing a new condition variable to fix this corner case.
---
 src/backend/access/transam/xlogrecovery.c | 28 ++++++++++++++++++++
 src/backend/replication/walsender.c       | 31 +++++++++++++++++------
 src/backend/utils/activity/wait_event.c   |  3 +++
 src/include/access/xlogrecovery.h         |  3 +++
 src/include/replication/walsender.h       |  1 +
 src/include/utils/wait_event.h            |  1 +
 6 files changed, 59 insertions(+), 8 deletions(-)
  41.2% src/backend/access/transam/
  48.5% src/backend/replication/
   3.6% src/backend/utils/activity/
   3.4% src/include/access/

diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index 41ffc57da9..0337fdc77a 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData
        RecoveryPauseState recoveryPauseState;
        ConditionVariable recoveryNotPausedCV;
 
+       /* Replay state (see getReplayedCV() for more explanation) */
+       ConditionVariable replayedCV;
+
        slock_t         info_lck;               /* locks shared variables shown 
above */
 } XLogRecoveryCtlData;
 
@@ -467,6 +470,7 @@ XLogRecoveryShmemInit(void)
        SpinLockInit(&XLogRecoveryCtl->info_lck);
        InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
        ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
+       ConditionVariableInit(&XLogRecoveryCtl->replayedCV);
 }
 
 /*
@@ -1916,6 +1920,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /*
+        * wake up walsender(s) used by logical decoding on standby.
+        */
+       ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV);
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and 
sends
@@ -4911,3 +4920,22 @@ assign_recovery_target_xid(const char *newval, void 
*extra)
        else
                recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Return the ConditionVariable indicating that a replay has been done.
+ *
+ * This is needed for logical decoding on standby. Indeed the "problem" is that
+ * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up
+ * by walreceiver when new WAL has been flushed. Which means that typically
+ * walsenders will get woken up at the same time that the startup process
+ * will be - which means that by the time the logical walsender checks
+ * GetXLogReplayRecPtr() it's unlikely that the startup process already 
replayed
+ * the record and updated XLogCtl->lastReplayedEndRecPtr.
+ *
+ * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case.
+ */
+ConditionVariable *
+getReplayedCV(void)
+{
+       return &XLogRecoveryCtl->replayedCV;
+}
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 9662e316c9..8c8dbe812f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1548,6 +1548,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 {
        int                     wakeEvents;
        static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+       ConditionVariable *replayedCV = getReplayedCV();
 
        /*
         * Fast path to avoid acquiring the spinlock in case we already know we
@@ -1566,7 +1567,6 @@ WalSndWaitForWal(XLogRecPtr loc)
 
        for (;;)
        {
-               long            sleeptime;
 
                /* Clear any already-pending wakeups */
                ResetLatch(MyLatch);
@@ -1650,20 +1650,35 @@ WalSndWaitForWal(XLogRecPtr loc)
                WalSndKeepaliveIfNecessary();
 
                /*
-                * Sleep until something happens or we time out.  Also wait for 
the
-                * socket becoming writable, if there's still pending output.
+                * When not in recovery, sleep until something happens or we 
time out.
+                * Also wait for the socket becoming writable, if there's still 
pending output.
                 * Otherwise we might sit on sendable output data while waiting 
for
                 * new WAL to be generated.  (But if we have nothing to send, 
we don't
                 * want to wake on socket-writable.)
                 */
-               sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+               if (!RecoveryInProgress())
+               {
+                       long            sleeptime;
+                       sleeptime = 
WalSndComputeSleeptime(GetCurrentTimestamp());
 
-               wakeEvents = WL_SOCKET_READABLE;
+                       wakeEvents = WL_SOCKET_READABLE;
 
-               if (pq_is_send_pending())
-                       wakeEvents |= WL_SOCKET_WRITEABLE;
+                       if (pq_is_send_pending())
+                               wakeEvents |= WL_SOCKET_WRITEABLE;
 
-               WalSndWait(wakeEvents, sleeptime, 
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+                       WalSndWait(wakeEvents, sleeptime * 10, 
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+               }
+               else
+               /*
+                * We are in the logical decoding on standby case.
+                * We are waiting for the startup process to replay wal 
record(s) using
+                * a timeout in case we are requested to stop.
+                */
+               {
+                       ConditionVariablePrepareToSleep(replayedCV);
+                       ConditionVariableTimedSleep(replayedCV, 1000,
+                                                                               
WAIT_EVENT_WAL_SENDER_WAIT_REPLAY);
+               }
        }
 
        /* reactivate latch so WalSndLoop knows to continue */
diff --git a/src/backend/utils/activity/wait_event.c 
b/src/backend/utils/activity/wait_event.c
index b2abd75ddb..3f6059805a 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -457,6 +457,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
                case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
                        event_name = "WalReceiverWaitStart";
                        break;
+               case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY:
+                       event_name = "WalReceiverWaitReplay";
+                       break;
                case WAIT_EVENT_XACT_GROUP_UPDATE:
                        event_name = "XactGroupUpdate";
                        break;
diff --git a/src/include/access/xlogrecovery.h 
b/src/include/access/xlogrecovery.h
index f3398425d8..0afd57ecac 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -15,6 +15,7 @@
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
 #include "utils/timestamp.h"
+#include "storage/condition_variable.h"
 
 /*
  * Recovery target type.
@@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char 
*param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern ConditionVariable *getReplayedCV(void);
+
 #endif                                                 /* XLOGRECOVERY_H */
diff --git a/src/include/replication/walsender.h 
b/src/include/replication/walsender.h
index 8336a6e719..550ef3107f 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_H
 
 #include <signal.h>
+#include "storage/condition_variable.h"
 
 /*
  * What to do with a snapshot in create replication slot command.
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 0b2100be4a..30c2cf35ae 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -128,6 +128,7 @@ typedef enum
        WAIT_EVENT_SYNC_REP,
        WAIT_EVENT_WAL_RECEIVER_EXIT,
        WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+       WAIT_EVENT_WAL_SENDER_WAIT_REPLAY,
        WAIT_EVENT_XACT_GROUP_UPDATE
 } WaitEventIPC;
 
-- 
2.34.1

From 018c7526fe21a0df01e27450b80a41023f31750f Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Thu, 1 Dec 2022 10:45:09 +0000
Subject: [PATCH v27 5/6] Doc changes describing details about logical
 decoding.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/logicaldecoding.sgml | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)
 100.0% doc/src/sgml/

diff --git a/doc/src/sgml/logicaldecoding.sgml 
b/doc/src/sgml/logicaldecoding.sgml
index 38ee69dccc..9acf16037a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,27 @@ postgres=# select * from 
pg_logical_slot_get_changes('regression_slot', NULL, NU
      may consume changes from a slot at any given time.
     </para>
 
+    <para>
+     A logical replication slot can also be created on a hot standby. To 
prevent
+     <command>VACUUM</command> from removing required rows from the system
+     catalogs, <varname>hot_standby_feedback</varname> should be set on the
+     standby. In spite of that, if any required rows get removed, the slot gets
+     invalidated. It's highly recommended to use a physical slot between the 
primary
+     and the standby. Otherwise, hot_standby_feedback will work, but only 
while the
+     connection is alive (for example a node restart would break it). Existing
+     logical slots on standby also get invalidated if wal_level on primary is 
reduced to
+     less than 'logical'.
+    </para>
+
+    <para>
+     For a logical slot to be created, it builds a historic snapshot, for which
+     information of all the currently running transactions is essential. On
+     primary, this information is available, but on standby, this information
+     has to be obtained from primary. So, slot creation may wait for some
+     activity to happen on the primary. If the primary is idle, creating a
+     logical slot on standby may take a noticeable time.
+    </para>
+
     <caution>
      <para>
       Replication slots persist across crashes and know nothing about the state
-- 
2.34.1

From 3212f47c0a73425342c22ecd56dcdf5b32b0d49a Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Thu, 1 Dec 2022 10:28:18 +0000
Subject: [PATCH v27 4/6] New TAP test for logical decoding on standby.

Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  37 ++
 src/test/recovery/meson.build                 |   1 +
 .../t/034_standby_logical_decoding.pl         | 500 ++++++++++++++++++
 3 files changed, 538 insertions(+)
   5.9% src/test/perl/PostgreSQL/Test/
  93.7% src/test/recovery/t/

diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm 
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 7411188dc8..171dc85388 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3037,6 +3037,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+       my ($self, $master, $slot_name, $dbname) = @_;
+       my ($stdout, $stderr);
+
+       my $handle;
+
+       $handle = IPC::Run::start(['pg_recvlogical', '-d', 
$self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, 
'--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+       # Once slot restart_lsn is created, the standby looks for 
xl_running_xacts
+       # WAL record from the restart_lsn onwards. So firstly, wait until the 
slot
+       # restart_lsn is evaluated.
+
+       $self->poll_query_until(
+               'postgres', qq[
+               SELECT restart_lsn IS NOT NULL
+               FROM pg_catalog.pg_replication_slots WHERE slot_name = 
'$slot_name'
+       ]) or die "timed out waiting for logical slot to calculate its 
restart_lsn";
+
+       # Now arrange for the xl_running_xacts record for which pg_recvlogical
+       # is waiting.
+       $master->safe_psql('postgres', 'CHECKPOINT');
+
+       $handle->finish();
+
+       is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on 
standby created')
+               or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b0e398363f..d68ee9b663 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -38,6 +38,7 @@ tests += {
       't/031_recovery_conflict.pl',
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
+      't/034_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/034_standby_logical_decoding.pl 
b/src/test/recovery/t/034_standby_logical_decoding.pl
new file mode 100644
index 0000000000..e77f9a6436
--- /dev/null
+++ b/src/test/recovery/t/034_standby_logical_decoding.pl
@@ -0,0 +1,500 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use Test::More tests => 38;
+use Time::HiRes qw(usleep);
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+       my ($node) = @_;
+
+       return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+       my ($node, $pat, $off) = @_;
+
+       $off = 0 unless defined $off;
+       my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+       return 0 if (length($log) <= $off);
+
+       $log = substr($log, $off);
+
+       return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+       my ($node, $slotname, $check_expr) = @_;
+
+       $node->poll_query_until(
+               'postgres', qq[
+               SELECT $check_expr
+               FROM pg_catalog.pg_replication_slots
+               WHERE slot_name = '$slotname';
+       ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+       $node_standby->create_logical_slot_on_standby($node_primary, 
'inactiveslot', 'testdb');
+       $node_standby->create_logical_slot_on_standby($node_primary, 
'activeslot', 'testdb');
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' 
slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+       my $wait = shift;
+       my $slot_user_handle;
+
+       print "starting pg_recvlogical\n";
+       $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', 
$node_standby->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', 
'--start'], '>', \$stdout, '2>', \$stderr);
+
+       if ($wait)
+       # make sure activeslot is in use
+       {
+               $node_standby->poll_query_until('testdb',
+                       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots 
WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+               ) or die "slot never became active";
+       }
+
+       return $slot_user_handle;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+       my ($slot_user_handle) = @_;
+       my $return;
+
+       is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 
'inactiveslot on standby dropped');
+       is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on 
standby dropped');
+
+       # our client should've terminated in response to the walsender error
+       $slot_user_handle->finish;
+       $return = $?;
+       cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+       if ($return) {
+               like($stderr, qr/conflict with recovery/, 'slot have been 
dropped');
+       }
+
+       return 0;
+}
+
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$primary_slotname');]);
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+       $node_primary, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+create_logical_slots();
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, 
s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+my $result = $node_standby->safe_psql('testdb',
+       qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+       14, 'Decoding produced 14 rows');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+       "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) 
ORDER BY lsn DESC LIMIT 1;"
+);
+print "waiting to replay $endpos\n";
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, 180,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 
'activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, 180,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+create_logical_slots();
+
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on the standby.
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_standby->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on primary. Both should be NULL since hs_feedback is off
+wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NULL AND catalog_xmin IS NULL");
+
+$handle = make_slot_active(1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', 'VACUUM FULL');
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery"),
+  'inactiveslot slot invalidation is logged with vacuum FULL');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery"),
+  'activeslot slot invalidation is logged with vacuum FULL');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+usleep(1000_000);
+
+# We are not able to read from the slot as it has been invalidated
+ok( find_in_log(
+   $node_standby,
+  "cannot read from logical replication slot \"activeslot\""),
+  'cannot read from logical replication slot');
+
+# Turn hot_standby_feedback back on
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on primary. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = get_log_size($node_standby);
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+
+create_logical_slots();
+
+# One way to produce recovery conflict is to create/drop a relation and launch 
a vacuum
+# with hot_standby_feedback turned off on the standby.
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_standby->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on primary. Both should be NULL since hs_feedback is off
+wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NULL AND catalog_xmin IS NULL");
+
+$handle = make_slot_active(1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM');
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is logged due to row removal');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is logged due to row removal');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 2 conflicts reported as the counter persist across restarts
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+usleep(1000_000);
+
+# We are not able to read from the slot as it has been invalidated
+ok( find_in_log(
+   $node_standby,
+  "cannot read from logical replication slot \"activeslot\"", $logstart),
+  'cannot read from logical replication slot');
+
+# Turn hot_standby_feedback back on
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on primary. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 3: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = get_log_size($node_standby);
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+
+create_logical_slots();
+
+$handle = make_slot_active(1);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is logged due to wal_level');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is logged due to wal_level');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 3 conflicts reported as the counter persist across restarts
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+usleep(1000_000);
+
+ok( find_in_log(
+   $node_standby,
+  "logical decoding on standby requires wal_level to be at least logical on 
master", $logstart),
+  'cannot start replication because wal_level < logical on master');
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+$handle = make_slot_active(0);
+usleep(1000_000);
+
+# as the slot has been invalidated we should not be able to read
+ok( find_in_log(
+   $node_standby,
+  "cannot read from logical replication slot \"activeslot\"", $logstart),
+  'cannot read from logical replication slot');
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+create_logical_slots();
+$handle = make_slot_active(1);
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 
'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+is($node_standby->safe_psql('postgres',
+       q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 
'f',
+       'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+       'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+
+# create the logical slots
+create_logical_slots();
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,7) s;]
+);
+
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
promoted standby');
-- 
2.34.1

From 18798f28008cf8aada4c4b84edaf8880066b548a Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Thu, 1 Dec 2022 10:26:14 +0000
Subject: [PATCH v27 3/6] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary. Conflicting slots
would be handled in next commits.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 src/backend/access/transam/xlog.c         | 11 ++++
 src/backend/replication/logical/decode.c  | 22 ++++++-
 src/backend/replication/logical/logical.c | 37 +++++++-----
 src/backend/replication/slot.c            | 73 +++++++++++++++--------
 src/backend/replication/walsender.c       | 27 +++++----
 src/include/access/xlog.h                 |  1 +
 6 files changed, 118 insertions(+), 53 deletions(-)
   4.5% src/backend/access/transam/
  36.6% src/backend/replication/logical/
  57.9% src/backend/replication/

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index e4503fb36d..00f021942f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4471,6 +4471,17 @@ LocalProcessControlFile(bool reset)
        ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+       return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 2cc0ac9eb0..c210721ab0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
                         * can restart from there.
                         */
                        break;
+               case XLOG_PARAMETER_CHANGE:
+               {
+                       xl_parameter_change *xlrec =
+                               (xl_parameter_change *) 
XLogRecGetData(buf->record);
+
+                       /*
+                        * If wal_level on primary is reduced to less than 
logical, then we
+                        * want to prevent existing logical slots from being 
used.
+                        * Existing logical slots on standby get invalidated 
when this WAL
+                        * record is replayed; and further, slot creation fails 
when the
+                        * wal level is not sufficient; but all these 
operations are not
+                        * synchronized, so a logical slot may creep in while 
the wal_level
+                        * is being reduced. Hence this extra check.
+                        */
+                       if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                errmsg("logical decoding on 
standby requires "
+                                                               "wal_level to 
be at least logical on master")));
+                       break;
+               }
                case XLOG_NOOP:
                case XLOG_NEXTOID:
                case XLOG_SWITCH:
                case XLOG_BACKUP_END:
-               case XLOG_PARAMETER_CHANGE:
                case XLOG_RESTORE_POINT:
                case XLOG_FPW_CHANGE:
                case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index 625a7f4273..a9567f2d8c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void)
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("logical decoding requires a database 
connection")));
 
-       /* ----
-        * TODO: We got to change that someday soon...
-        *
-        * There's basically three things missing to allow this:
-        * 1) We need to be able to correctly and quickly identify the timeline 
a
-        *        LSN belongs to
-        * 2) We need to force hot_standby_feedback to be enabled at all times 
so
-        *        the primary cannot remove rows we need.
-        * 3) support dropping replication slots referring to a database, in
-        *        dbase_redo. There can't be any active ones due to HS recovery
-        *        conflicts, so that should be relatively easy.
-        * ----
-        */
        if (RecoveryInProgress())
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("logical decoding cannot be used while 
in recovery")));
+       {
+               /*
+                * This check may have race conditions, but whenever
+                * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, 
we
+                * verify that there are no existing logical replication slots. 
And to
+                * avoid races around creating a new slot,
+                * CheckLogicalDecodingRequirements() is called once before 
creating
+                * the slot, and once when logical decoding is initially 
starting up.
+                */
+               if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("logical decoding on standby 
requires "
+                                                       "wal_level to be at 
least logical on master")));
+       }
 }
 
 /*
@@ -331,6 +330,12 @@ CreateInitDecodingContext(const char *plugin,
        LogicalDecodingContext *ctx;
        MemoryContext old_context;
 
+       /*
+        * On standby, this check is also required while creating the slot. 
Check
+        * the comments in this function.
+        */
+       CheckLogicalDecodingRequirements();
+
        /* shorter lines... */
        slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6a4e2cd19b..f554dac6fd 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -51,6 +51,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "access/xlogrecovery.h"
 
 /*
  * Replication slot on-disk data structure.
@@ -1175,37 +1176,46 @@ ReplicationSlotReserveWal(void)
                /*
                 * For logical slots log a standby snapshot and start logical 
decoding
                 * at exactly that position. That allows the slot to start up 
more
-                * quickly.
+                * quickly. But on a standby we cannot do WAL writes, so just 
use the
+                * replay pointer; effectively, an attempt to create a logical 
slot on
+                * standby will cause it to wait for an xl_running_xact record 
to be
+                * logged independently on the primary, so that a snapshot can 
be built
+                * using the record.
                 *
-                * That's not needed (or indeed helpful) for physical slots as 
they'll
-                * start replay at the last logged checkpoint anyway. Instead 
return
-                * the location of the last redo LSN. While that slightly 
increases
-                * the chance that we have to retry, it's where a base backup 
has to
-                * start replay at.
+                * None of this is needed (or indeed helpful) for physical 
slots as
+                * they'll start replay at the last logged checkpoint anyway. 
Instead
+                * return the location of the last redo LSN. While that slightly
+                * increases the chance that we have to retry, it's where a 
base backup
+                * has to start replay at.
                 */
-               if (!RecoveryInProgress() && SlotIsLogical(slot))
+               if (SlotIsPhysical(slot))
+                       restart_lsn = GetRedoRecPtr();
+               else if (RecoveryInProgress())
                {
-                       XLogRecPtr      flushptr;
-
-                       /* start at current insert position */
-                       restart_lsn = GetXLogInsertRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-
-                       /* make sure we have enough information to start */
-                       flushptr = LogStandbySnapshot();
-
-                       /* and make sure it's fsynced to disk */
-                       XLogFlush(flushptr);
+                       restart_lsn = GetXLogReplayRecPtr(NULL);
+                       /*
+                        * Replay pointer may point one past the end of the 
record. If that
+                        * is a XLOG page boundary, it will not be a valid LSN 
for the
+                        * start of a record, so bump it up past the page 
header.
+                        */
+                       if (!XRecOffIsValid(restart_lsn))
+                       {
+                               if (restart_lsn % XLOG_BLCKSZ != 0)
+                                       elog(ERROR, "invalid replay pointer");
+
+                               /* For the first page of a segment file, it's a 
long header */
+                               if (XLogSegmentOffset(restart_lsn, 
wal_segment_size) == 0)
+                                       restart_lsn += SizeOfXLogLongPHD;
+                               else
+                                       restart_lsn += SizeOfXLogShortPHD;
+                       }
                }
                else
-               {
-                       restart_lsn = GetRedoRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-               }
+                       restart_lsn = GetXLogInsertRecPtr();
+
+               SpinLockAcquire(&slot->mutex);
+               slot->data.restart_lsn = restart_lsn;
+               SpinLockRelease(&slot->mutex);
 
                /* prevent WAL removal as fast as possible */
                ReplicationSlotsComputeRequiredLSN();
@@ -1221,6 +1231,17 @@ ReplicationSlotReserveWal(void)
                if (XLogGetLastRemovedSegno() < segno)
                        break;
        }
+
+       if (!RecoveryInProgress() && SlotIsLogical(slot))
+       {
+               XLogRecPtr      flushptr;
+
+               /* make sure we have enough information to start */
+               flushptr = LogStandbySnapshot();
+
+               /* and make sure it's fsynced to disk */
+               XLogFlush(flushptr);
+       }
 }
 
 /*
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 64fbd52e34..9662e316c9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,14 +906,18 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        int                     count;
        WALReadError errinfo;
        XLogSegNo       segno;
-       TimeLineID      currTLI = GetWALInsertionTimeLine();
+       TimeLineID      currTLI;
 
        /*
-        * Since logical decoding is only permitted on a primary server, we know
-        * that the current timeline ID can't be changing any more. If we did 
this
-        * on a standby, we'd have to worry about the values we compute here
-        * becoming invalid due to a promotion or timeline change.
+        * Since logical decoding is also permitted on a standby server, we need
+        * to check if the server is in recovery to decide how to get the 
current
+        * timeline ID (so that it also cover the promotion or timeline change 
cases).
         */
+       if (!RecoveryInProgress())
+               currTLI = GetWALInsertionTimeLine();
+       else
+               GetXLogReplayRecPtr(&currTLI);
+
        XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
        sendTimeLineIsHistoric = (state->currTLI != currTLI);
        sendTimeLine = state->currTLI;
@@ -3074,10 +3078,12 @@ XLogSendLogical(void)
         * If first time through in this session, initialize flushPtr.  
Otherwise,
         * we only need to update flushPtr if EndRecPtr is past it.
         */
-       if (flushPtr == InvalidXLogRecPtr)
-               flushPtr = GetFlushRecPtr(NULL);
-       else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-               flushPtr = GetFlushRecPtr(NULL);
+       if (flushPtr == InvalidXLogRecPtr ||
+               logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+       {
+               flushPtr = (am_cascading_walsender ?
+                                       GetStandbyFlushRecPtr(NULL) : 
GetFlushRecPtr(NULL));
+       }
 
        /* If EndRecPtr is still past our flushPtr, it means we caught up. */
        if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3168,7 +3174,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
        receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-       *tli = replayTLI;
+       if (tli)
+               *tli = replayTLI;
 
        result = replayPtr;
        if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1fbd48fbda..027e155e8e 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
-- 
2.34.1

From f90eaf818fd17ff809a54473e787ac5a6d10855f Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Thu, 1 Dec 2022 10:15:19 +0000
Subject: [PATCH v27 2/6] Handle logical slot conflicts on standby.

During WAL replay on standby, when slot conflict is identified,
invalidate such slots. Also do the same thing if wal_level on master
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery. Arrange for a new pg_stat_database_conflicts field:
confl_active_logicalslot.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/monitoring.sgml                  |  11 +
 src/backend/access/gist/gistxlog.c            |   2 +
 src/backend/access/hash/hash_xlog.c           |   1 +
 src/backend/access/heap/heapam.c              |   3 +
 src/backend/access/nbtree/nbtxlog.c           |   2 +
 src/backend/access/spgist/spgxlog.c           |   1 +
 src/backend/access/transam/xlog.c             |  13 ++
 src/backend/catalog/system_views.sql          |   3 +-
 .../replication/logical/logicalfuncs.c        |   7 +-
 src/backend/replication/slot.c                | 209 ++++++++++++++++++
 src/backend/replication/walsender.c           |   8 +
 src/backend/storage/ipc/procarray.c           |   4 +
 src/backend/storage/ipc/procsignal.c          |   3 +
 src/backend/storage/ipc/standby.c             |  13 +-
 src/backend/tcop/postgres.c                   |  22 ++
 src/backend/utils/activity/pgstat_database.c  |   4 +
 src/backend/utils/adt/pgstatfuncs.c           |  16 ++
 src/include/catalog/pg_proc.dat               |   5 +
 src/include/pgstat.h                          |   1 +
 src/include/replication/slot.h                |   2 +
 src/include/storage/procsignal.h              |   1 +
 src/include/storage/standby.h                 |   2 +
 src/test/regress/expected/rules.out           |   3 +-
 23 files changed, 331 insertions(+), 5 deletions(-)
   3.7% doc/src/sgml/
   5.0% src/backend/access/transam/
   4.4% src/backend/access/
   3.7% src/backend/replication/logical/
  56.3% src/backend/replication/
   7.2% src/backend/storage/ipc/
   7.6% src/backend/tcop/
   4.1% src/backend/utils/
   5.8% src/include/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index b41b4e2a90..68d1c2479d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4317,6 +4317,17 @@ SELECT pid, wait_event_type, wait_event FROM 
pg_stat_activity WHERE wait_event i
        deadlocks
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of active logical slots in this database that have been
+       invalidated because they conflict with recovery (note that inactive ones
+       are also invalidated but do not increment this counter)
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index b747900a45..75818266a6 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -196,6 +196,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->onCatalogAccessibleInLogicalDecoding,
                                                                                
        rlocator);
        }
 
@@ -396,6 +397,7 @@ gistRedoPageReuse(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->onCatalogAccessibleInLogicalDecoding,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/hash/hash_xlog.c 
b/src/backend/access/hash/hash_xlog.c
index b452697a2f..61565f905d 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1001,6 +1001,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->onCatalogAccessibleInLogicalDecoding,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b42a6bfa61..a79f010286 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8696,6 +8696,7 @@ heap_xlog_prune(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->onCatalogAccessibleInLogicalDecoding,
                                                                                
        rlocator);
 
        /*
@@ -8865,6 +8866,7 @@ heap_xlog_visible(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->onCatalogAccessibleInLogicalDecoding,
                                                                                
        rlocator);
 
        /*
@@ -9120,6 +9122,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->onCatalogAccessibleInLogicalDecoding,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/nbtree/nbtxlog.c 
b/src/backend/access/nbtree/nbtxlog.c
index 3e311a98a6..6289f8d250 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->onCatalogAccessibleInLogicalDecoding,
                                                                                
        rlocator);
        }
 
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->onCatalogAccessibleInLogicalDecoding,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/spgist/spgxlog.c 
b/src/backend/access/spgist/spgxlog.c
index 44adc2098f..6ad467117f 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->onCatalogAccessibleInLogicalDecoding,
                                                                                
        locator);
        }
 
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index a31fbbff78..e4503fb36d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7967,6 +7967,19 @@ xlog_redo(XLogReaderState *record)
                /* Update our copy of the parameters in pg_control */
                memcpy(&xlrec, XLogRecGetData(record), 
sizeof(xl_parameter_change));
 
+               /*
+                * Invalidate logical slots if we are in hot standby and the 
primary does not
+                * have a WAL level sufficient for logical decoding. No need to 
search
+                * for potentially conflicting logically slots if standby is 
running
+                * with wal_level lower than logical, because in that case, we 
would
+                * have either disallowed creation of logical slots or 
invalidated existing
+                * ones.
+                */
+               if (InRecovery && InHotStandby &&
+                       xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+                       wal_level >= WAL_LEVEL_LOGICAL)
+                       
InvalidateConflictingLogicalReplicationSlots(InvalidOid,InvalidTransactionId);
+
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->MaxConnections = xlrec.MaxConnections;
                ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 2d8104b090..0e0b8ef415 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1065,7 +1065,8 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
-            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS 
confl_active_logicalslot
     FROM pg_database D;
 
 CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/logical/logicalfuncs.c 
b/src/backend/replication/logical/logicalfuncs.c
index 5c23178570..8432de219b 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -216,11 +216,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
 
                /*
                 * After the sanity checks in CreateDecodingContext, make sure 
the
-                * restart_lsn is valid.  Avoid "cannot get changes" wording in 
this
+                * restart_lsn is valid or both xmin and catalog_xmin are valid.
+                * Avoid "cannot get changes" wording in this
                 * errmsg because that'd be confusingly ambiguous about no 
changes
                 * being available.
                 */
-               if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
+               if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)
+                       || (!TransactionIdIsValid(MyReplicationSlot->data.xmin)
+                               && 
!TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin)))
                        ereport(ERROR,
                                        
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         errmsg("can no longer get changes from 
replication slot \"%s\"",
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 899acfd912..6a4e2cd19b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1432,6 +1432,215 @@ restart:
        return invalidated;
 }
 
+/*
+ * Helper for InvalidateConflictingLogicalReplicationSlot -- acquires the 
given slot
+ * and mark it invalid, if necessary and possible.
+ *
+ * Returns whether ReplicationSlotControlLock was released in the interim (and
+ * in that case we're not holding the lock at return, otherwise we are).
+ *
+ * This is inherently racy, because we release the LWLock
+ * for syscalls, so caller must restart if we return true.
+ */
+static bool
+InvalidatePossiblyConflictingLogicalReplicationSlot(ReplicationSlot *s, 
TransactionId xid)
+{
+       int             last_signaled_pid = 0;
+       bool    released_lock = false;
+
+       for (;;)
+       {
+               TransactionId slot_xmin;
+               TransactionId slot_catalog_xmin;
+               NameData        slotname;
+               int                     active_pid = 0;
+
+               Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, 
LW_SHARED));
+
+               if (!s->in_use)
+               {
+                       if (released_lock)
+                               LWLockRelease(ReplicationSlotControlLock);
+                       break;
+               }
+
+               /*
+                * Check if the slot needs to be invalidated. If it needs to be
+                * invalidated, and is not currently acquired, acquire it and 
mark it
+                * as having been invalidated. We do this with the spinlock 
held to
+                * avoid race conditions -- for example the xmin(s) could move 
forward
+                * , or the slot could be dropped.
+                */
+               SpinLockAcquire(&s->mutex);
+
+               slot_xmin = s->data.xmin;
+               slot_catalog_xmin = s->data.catalog_xmin;
+
+               /*
+                * If the slot is already invalid or is not conflicting, we 
don't need to
+                * do anything.
+                */
+
+               /* slot has been invalidated */
+               if ((!TransactionIdIsValid(slot_xmin) && 
!TransactionIdIsValid(slot_catalog_xmin))
+                       ||
+               /*
+                * we are not forcing for invalidation because the xid is valid
+                * and this is a non conflicting slot
+                */
+                       (TransactionIdIsValid(xid) && !(
+                               (TransactionIdIsValid(slot_xmin) && 
TransactionIdPrecedesOrEquals(slot_xmin, xid))
+                               ||
+                               (TransactionIdIsValid(slot_catalog_xmin) && 
TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+                               ))
+                       )
+               {
+                       SpinLockRelease(&s->mutex);
+                       if (released_lock)
+                               LWLockRelease(ReplicationSlotControlLock);
+                       break;
+               }
+
+               slotname = s->data.name;
+               active_pid = s->active_pid;
+
+               /*
+                * If the slot can be acquired, do so and mark it invalidated
+                * immediately.  Otherwise we'll signal the owning process, 
below, and
+                * retry.
+                */
+               if (active_pid == 0)
+               {
+                       MyReplicationSlot = s;
+                       s->active_pid = MyProcPid;
+                       s->data.xmin = InvalidTransactionId;
+                       s->data.catalog_xmin = InvalidTransactionId;
+               }
+
+               SpinLockRelease(&s->mutex);
+
+               if (active_pid != 0)
+               {
+                       /*
+                        * Prepare the sleep on the slot's condition variable 
before
+                        * releasing the lock, to close a possible race 
condition if the
+                        * slot is released before the sleep below.
+                        */
+
+                       ConditionVariablePrepareToSleep(&s->active_cv);
+
+                       LWLockRelease(ReplicationSlotControlLock);
+                       released_lock = true;
+
+                       /*
+                        * Signal to terminate the process that owns the slot, 
if we
+                        * haven't already signalled it.  (Avoidance of repeated
+                        * signalling is the only reason for there to be a loop 
in this
+                        * routine; otherwise we could rely on caller's restart 
loop.)
+                        *
+                        * There is the race condition that other process may 
own the slot
+                        * after its current owner process is terminated and 
before this
+                        * process owns it. To handle that, we signal only if 
the PID of
+                        * the owning process has changed from the previous 
time. (This
+                        * logic assumes that the same PID is not reused very 
quickly.)
+                        */
+                       if (last_signaled_pid != active_pid)
+                       {
+                               ereport(LOG,
+                                               (errmsg("terminating process %d 
because replication slot \"%s\" conflicts with recovery",
+                                                               active_pid, 
NameStr(slotname))));
+
+                               (void) SendProcSignal(active_pid, 
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
+                               last_signaled_pid = active_pid;
+                       }
+
+                       /* Wait until the slot is released. */
+                       ConditionVariableSleep(&s->active_cv,
+                                                                       
WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+                       /*
+                        * Re-acquire lock and start over; we expect to 
invalidate the
+                        * slot next time (unless another process acquires the 
slot in the
+                        * meantime).
+                        */
+                       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+                       continue;
+               }
+               else
+               {
+                       /*
+                        * We hold the slot now and have already invalidated 
it; flush it
+                        * to ensure that state persists.
+                        *
+                        * Don't want to hold ReplicationSlotControlLock across 
file
+                        * system operations, so release it now but be sure to 
tell caller
+                        * to restart from scratch.
+                        */
+                       LWLockRelease(ReplicationSlotControlLock);
+                       released_lock = true;
+
+                       /* Make sure the invalidated state persists across 
server restart */
+                       ReplicationSlotMarkDirty();
+                       ReplicationSlotSave();
+                       ReplicationSlotRelease();
+                       pgstat_drop_replslot(s);
+
+                       ereport(LOG,
+                                       (errmsg("invalidating slot \"%s\" 
because it conflicts with recovery", NameStr(slotname))));
+
+                       /* done with this slot for now */
+                       break;
+               }
+       }
+
+       Assert(!released_lock == 
LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+       return released_lock;
+}
+
+/*
+ * Resolve recovery conflicts with logical slots.
+ *
+ * When xid is valid, it means that we are about to remove rows older than xid.
+ * Therefore we need to invalidate slots that depend on seeing those rows.
+ * When xid is invalid, invalidate all logical slots. This is required when the
+ * master wal_level is set back to replica, so existing logical slots need to
+ * be invalidated.
+ */
+void
+InvalidateConflictingLogicalReplicationSlots(Oid dboid, TransactionId xid)
+{
+
+       Assert(max_replication_slots >= 0);
+
+       if (max_replication_slots == 0)
+               return;
+restart:
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (int i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (!s->in_use)
+                       continue;
+
+               /* We are only dealing with *logical* slot conflicts. */
+               if (!SlotIsLogical(s))
+                       continue;
+
+               /* not our database and we don't want all the database, skip */
+               if (s->data.database != dboid && TransactionIdIsValid(xid))
+                       continue;
+
+               if (InvalidatePossiblyConflictingLogicalReplicationSlot(s, xid))
+               {
+                       /* if the lock was released, we need to restart from 
scratch */
+                       goto restart;
+               }
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index c11bb3716f..64fbd52e34 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,6 +1253,14 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        ReplicationSlotAcquire(cmd->slotname, true);
 
+       if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)
+                && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin))
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("cannot read from logical replication 
slot \"%s\"",
+                                               cmd->slotname),
+                                errdetail("This slot has been invalidated 
because it was conflicting with recovery.")));
+
        if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                ereport(ERROR,
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/storage/ipc/procarray.c 
b/src/backend/storage/ipc/procarray.c
index 0176f30270..d68b752c91 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -3477,6 +3477,10 @@ SignalVirtualTransaction(VirtualTransactionId vxid, 
ProcSignalReason sigmode,
 
                GET_VXID_FROM_PGPROC(procvxid, *proc);
 
+               /*
+                * Note: vxid.localTransactionId can be invalid, which means the
+                * request is to signal the pid that is not running a 
transaction.
+                */
                if (procvxid.backendId == vxid.backendId &&
                        procvxid.localTransactionId == vxid.localTransactionId)
                {
diff --git a/src/backend/storage/ipc/procsignal.c 
b/src/backend/storage/ipc/procsignal.c
index 7767657f27..1b3bf943c1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -669,6 +669,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
                RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+       if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+               
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
                
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c 
b/src/backend/storage/ipc/standby.c
index f43229dfda..553953959d 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -35,6 +35,7 @@
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
+#include "replication/slot.h"
 
 /* User-settable GUC parameters */
 int                    vacuum_defer_cleanup_age;
@@ -475,6 +476,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId 
*waitlist,
  */
 void
 ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+                                                                       bool 
onCatalogAccessibleInLogicalDecoding,
                                                                        
RelFileLocator locator)
 {
        VirtualTransactionId *backends;
@@ -499,6 +501,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
                                                                                
   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   true);
+
+       if (onCatalogAccessibleInLogicalDecoding)
+               InvalidateConflictingLogicalReplicationSlots(locator.dbOid, 
snapshotConflictHorizon);
 }
 
 /*
@@ -507,6 +512,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
  */
 void
 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
   bool onCatalogAccessibleInLogicalDecoding,
                                                                                
   RelFileLocator locator)
 {
        /*
@@ -525,7 +531,9 @@ 
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
                TransactionId truncated;
 
                truncated = XidFromFullTransactionId(snapshotConflictHorizon);
-               ResolveRecoveryConflictWithSnapshot(truncated, locator);
+               ResolveRecoveryConflictWithSnapshot(truncated,
+                                                                               
        onCatalogAccessibleInLogicalDecoding,
+                                                                               
        locator);
        }
 }
 
@@ -1486,6 +1494,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        reasonDesc = _("recovery conflict on snapshot");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       reasonDesc = _("recovery conflict on replication slot");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        reasonDesc = _("recovery conflict on buffer deadlock");
                        break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 3082093d1e..d900eed9f4 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2470,6 +2470,9 @@ errdetail_recovery_conflict(void)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        errdetail("User query might have needed to see row 
versions that must be removed.");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       errdetail("User was using the logical slot that must be 
dropped.");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        errdetail("User transaction caused buffer deadlock with 
recovery.");
                        break;
@@ -3039,6 +3042,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
                        case PROCSIG_RECOVERY_CONFLICT_LOCK:
                        case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
                        case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+                       case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                               /*
+                                * For conflicts that require a logical slot to 
be invalidated, the
+                                * requirement is for the signal receiver to 
release the slot,
+                                * so that it could be invalidated by the 
signal sender. So for
+                                * normal backends, the transaction should be 
aborted, just
+                                * like for other recovery conflicts. But if 
it's walsender on
+                                * standby, then it has to be killed so as to 
release an
+                                * acquired logical slot.
+                                */
+                               if (am_cascading_walsender &&
+                                       reason == 
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+                                       MyReplicationSlot && 
SlotIsLogical(MyReplicationSlot))
+                               {
+                                       RecoveryConflictPending = true;
+                                       QueryCancelPending = true;
+                                       InterruptPending = true;
+                                       break;
+                               }
 
                                /*
                                 * If we aren't in a transaction any longer 
then ignore.
diff --git a/src/backend/utils/activity/pgstat_database.c 
b/src/backend/utils/activity/pgstat_database.c
index d9275611f0..760d2cd882 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
                case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
                        dbentry->n_conflict_bufferpin++;
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       dbentry->n_conflict_logicalslot++;
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        dbentry->n_conflict_startup_deadlock++;
                        break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool 
nowait)
        PGSTAT_ACCUM_DBCOUNT(n_conflict_tablespace);
        PGSTAT_ACCUM_DBCOUNT(n_conflict_lock);
        PGSTAT_ACCUM_DBCOUNT(n_conflict_snapshot);
+       PGSTAT_ACCUM_DBCOUNT(n_conflict_logicalslot);
        PGSTAT_ACCUM_DBCOUNT(n_conflict_bufferpin);
        PGSTAT_ACCUM_DBCOUNT(n_conflict_startup_deadlock);
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c 
b/src/backend/utils/adt/pgstatfuncs.c
index ae3365d917..749912c1f2 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1432,6 +1432,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS)
        PG_RETURN_INT64(result);
 }
 
+Datum
+pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS)
+{
+       Oid                     dbid = PG_GETARG_OID(0);
+       int64           result;
+       PgStat_StatDBEntry *dbentry;
+
+       if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+               result = 0;
+       else
+               result = (int64) (dbentry->n_conflict_logicalslot);
+
+       PG_RETURN_INT64(result);
+}
+
 Datum
 pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS)
 {
@@ -1475,6 +1490,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
                result = (int64) (dbentry->n_conflict_tablespace +
                                                  dbentry->n_conflict_lock +
                                                  dbentry->n_conflict_snapshot +
+                                                 
dbentry->n_conflict_logicalslot +
                                                  dbentry->n_conflict_bufferpin 
+
                                                  
dbentry->n_conflict_startup_deadlock);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f9301b2627..105b01d68e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5539,6 +5539,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+  descr => 'statistics: recovery conflicts in database caused by logical 
replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer 
pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 9e2ce6f011..0f4ac12f89 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -291,6 +291,7 @@ typedef struct PgStat_StatDBEntry
        PgStat_Counter n_conflict_tablespace;
        PgStat_Counter n_conflict_lock;
        PgStat_Counter n_conflict_snapshot;
+       PgStat_Counter n_conflict_logicalslot;
        PgStat_Counter n_conflict_bufferpin;
        PgStat_Counter n_conflict_startup_deadlock;
        PgStat_Counter n_temp_files;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 65f2c74239..0ed1d8af28 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -216,6 +216,7 @@ extern XLogRecPtr 
ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern void InvalidateConflictingLogicalReplicationSlots(Oid dboid, 
TransactionId xid);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool 
need_lock);
 extern int     ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
@@ -227,5 +228,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId 
xid, char *reason);
 
 #endif                                                 /* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index ee636900f3..56096bd3e2 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -41,6 +41,7 @@ typedef enum
        PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
        PROCSIG_RECOVERY_CONFLICT_LOCK,
        PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+       PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
        PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
        PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index e46c934c56..be86a1246f 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
+                                                                               
                bool onCatalogAccessibleInLogicalDecoding,
                                                                                
                RelFileLocator locator);
 extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
                           bool onCatalogAccessibleInLogicalDecoding,
                                                                                
                           RelFileLocator locator);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index 37c1c86473..2eed08619d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1868,7 +1868,8 @@ pg_stat_database_conflicts| SELECT d.oid AS datid,
     pg_stat_get_db_conflict_lock(d.oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot,
     pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin,
-    pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock
+    pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock,
+    pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_active_logicalslot
    FROM pg_database d;
 pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
-- 
2.34.1

From 345dcf6d40919a2f29e191f9e272145c0c204ca1 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Thu, 1 Dec 2022 10:14:30 +0000
Subject: [PATCH v27 1/6] Add info in WAL records in preparation for logical
 slot conflict handling.

When a WAL replay on standby indicates that a catalog table tuple is
to be deleted by an xid that is greater than a logical slot's
catalog_xmin, then that means the slot's catalog_xmin conflicts with
the xid, and we need to handle the conflict.  While subsequent commits
will do the actual conflict handling, this commit adds a new field
onCatalogAccessibleInLogicalDecoding in such WAL records, that is true
for catalog tables, so as to arrange for conflict handling.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand
Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de
Royes Mello
---
 contrib/test_decoding/expected/ddl.out  | 29 +++++++++++++
 contrib/test_decoding/sql/ddl.sql       |  7 ++++
 doc/src/sgml/catalogs.sgml              | 11 +++++
 src/backend/access/common/reloptions.c  |  2 +-
 src/backend/access/gist/gistxlog.c      |  1 +
 src/backend/access/hash/hashinsert.c    |  1 +
 src/backend/access/heap/heapam.c        |  4 +-
 src/backend/access/heap/pruneheap.c     |  1 +
 src/backend/access/heap/visibilitymap.c |  3 +-
 src/backend/access/nbtree/nbtpage.c     |  2 +
 src/backend/access/spgist/spgvacuum.c   |  1 +
 src/backend/catalog/index.c             | 14 +++++--
 src/backend/commands/tablecmds.c        | 55 ++++++++++++++++++++++++-
 src/include/access/gistxlog.h           |  2 +
 src/include/access/hash_xlog.h          |  1 +
 src/include/access/heapam_xlog.h        |  5 ++-
 src/include/access/nbtxlog.h            |  2 +
 src/include/access/spgxlog.h            |  1 +
 src/include/catalog/pg_index.h          |  2 +
 src/include/utils/rel.h                 | 34 +++++++++++++++
 20 files changed, 169 insertions(+), 9 deletions(-)
  11.2% contrib/test_decoding/expected/
   6.6% contrib/test_decoding/sql/
   6.5% doc/src/sgml/
   8.3% src/backend/access/heap/
   7.1% src/backend/access/
   6.0% src/backend/catalog/
  22.6% src/backend/commands/
   8.5% src/include/access/
  21.4% src/include/utils/

diff --git a/contrib/test_decoding/expected/ddl.out 
b/contrib/test_decoding/expected/ddl.out
index 9a28b5ddc5..40cf2f4dc4 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -483,6 +483,7 @@ CREATE TABLE replication_metadata (
 )
 WITH (user_catalog_table = true)
 ;
+CREATE INDEX replication_metadata_idx1 on replication_metadata(relation);
 \d+ replication_metadata
                                                  Table 
"public.replication_metadata"
   Column  |  Type   | Collation | Nullable |                     Default       
               | Storage  | Stats target | Description 
@@ -492,8 +493,15 @@ WITH (user_catalog_table = true)
  options  | text[]  |           |          |                                   
               | extended |              | 
 Indexes:
     "replication_metadata_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_idx1" btree (relation)
 Options: user_catalog_table=true
 
+SELECT bool_and(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
+ bool_and 
+----------
+ t
+(1 row)
+
 INSERT INTO replication_metadata(relation, options)
 VALUES ('foo', ARRAY['a', 'b']);
 ALTER TABLE replication_metadata RESET (user_catalog_table);
@@ -506,6 +514,13 @@ ALTER TABLE replication_metadata RESET 
(user_catalog_table);
  options  | text[]  |           |          |                                   
               | extended |              | 
 Indexes:
     "replication_metadata_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_idx1" btree (relation)
+
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
+ bool_or 
+---------
+ f
+(1 row)
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('bar', ARRAY['a', 'b']);
@@ -519,8 +534,15 @@ ALTER TABLE replication_metadata SET (user_catalog_table = 
true);
  options  | text[]  |           |          |                                   
               | extended |              | 
 Indexes:
     "replication_metadata_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_idx1" btree (relation)
 Options: user_catalog_table=true
 
+SELECT bool_and(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
+ bool_and 
+----------
+ t
+(1 row)
+
 INSERT INTO replication_metadata(relation, options)
 VALUES ('blub', NULL);
 -- make sure rewrites don't work
@@ -538,8 +560,15 @@ ALTER TABLE replication_metadata SET (user_catalog_table = 
false);
  rewritemeornot | integer |           |          |                             
                     | plain    |              | 
 Indexes:
     "replication_metadata_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_idx1" btree (relation)
 Options: user_catalog_table=false
 
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
+ bool_or 
+---------
+ f
+(1 row)
+
 INSERT INTO replication_metadata(relation, options)
 VALUES ('zaphod', NULL);
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');
diff --git a/contrib/test_decoding/sql/ddl.sql 
b/contrib/test_decoding/sql/ddl.sql
index 4f76bed72c..85ddd4be03 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -276,19 +276,25 @@ CREATE TABLE replication_metadata (
 )
 WITH (user_catalog_table = true)
 ;
+
+CREATE INDEX replication_metadata_idx1 on replication_metadata(relation);
+
 \d+ replication_metadata
+SELECT bool_and(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('foo', ARRAY['a', 'b']);
 
 ALTER TABLE replication_metadata RESET (user_catalog_table);
 \d+ replication_metadata
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('bar', ARRAY['a', 'b']);
 
 ALTER TABLE replication_metadata SET (user_catalog_table = true);
 \d+ replication_metadata
+SELECT bool_and(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('blub', NULL);
@@ -299,6 +305,7 @@ ALTER TABLE replication_metadata ALTER COLUMN 
rewritemeornot TYPE text;
 
 ALTER TABLE replication_metadata SET (user_catalog_table = false);
 \d+ replication_metadata
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('zaphod', NULL);
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 9ed2b020b7..18d6b99cac 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -4437,6 +4437,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration 
count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indisusercatalog</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the index is linked to a table that is declared as an 
additional
+       catalog table for purposes of logical replication (means has <link 
linkend="sql-createtable"><literal>user_catalog_table</literal></link>)
+       set to true.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>indisreplident</structfield> <type>bool</type>
diff --git a/src/backend/access/common/reloptions.c 
b/src/backend/access/common/reloptions.c
index 75b7344891..4b41f5e68d 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -120,7 +120,7 @@ static relopt_bool boolRelOpts[] =
                        RELOPT_KIND_HEAP,
                        AccessExclusiveLock
                },
-               false
+               HEAP_DEFAULT_USER_CATALOG_TABLE
        },
        {
                {
diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index cb5affa3d2..b747900a45 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -608,6 +608,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, 
FullTransactionId deleteXid)
         */
 
        /* XLOG stuff */
+       xlrec_reuse.onCatalogAccessibleInLogicalDecoding = 
IndexIsAccessibleInLogicalDecoding(rel);
        xlrec_reuse.locator = rel->rd_locator;
        xlrec_reuse.block = blkno;
        xlrec_reuse.snapshotConflictHorizon = deleteXid;
diff --git a/src/backend/access/hash/hashinsert.c 
b/src/backend/access/hash/hashinsert.c
index 9a921e341e..7e59a384af 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -432,6 +432,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer 
metabuf, Buffer buf)
                        xl_hash_vacuum_one_page xlrec;
                        XLogRecPtr      recptr;
 
+                       xlrec.onCatalogAccessibleInLogicalDecoding = 
RelationIsAccessibleInLogicalDecoding(hrel);
                        xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
                        xlrec.ntuples = ndeletable;
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 747db50376..b42a6bfa61 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -6831,6 +6831,7 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
                snapshotConflictHorizon = FreezeLimit;
                TransactionIdRetreat(snapshotConflictHorizon);
 
+               xlrec.onCatalogAccessibleInLogicalDecoding = 
RelationIsAccessibleInLogicalDecoding(rel);
                xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
                xlrec.nplans = nplans;
 
@@ -8248,7 +8249,7 @@ bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate)
  * update the heap page's LSN.
  */
 XLogRecPtr
-log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
                                 TransactionId snapshotConflictHorizon, uint8 
vmflags)
 {
        xl_heap_visible xlrec;
@@ -8258,6 +8259,7 @@ log_heap_visible(RelFileLocator rlocator, Buffer 
heap_buffer, Buffer vm_buffer,
        Assert(BufferIsValid(heap_buffer));
        Assert(BufferIsValid(vm_buffer));
 
+       xlrec.onCatalogAccessibleInLogicalDecoding = 
RelationIsAccessibleInLogicalDecoding(rel);
        xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
        xlrec.flags = vmflags;
        XLogBeginInsert();
diff --git a/src/backend/access/heap/pruneheap.c 
b/src/backend/access/heap/pruneheap.c
index 91c5f5e9ef..aae78f7144 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -418,6 +418,7 @@ heap_page_prune(Relation relation, Buffer buffer,
                        xl_heap_prune xlrec;
                        XLogRecPtr      recptr;
 
+                       xlrec.onCatalogAccessibleInLogicalDecoding = 
RelationIsAccessibleInLogicalDecoding(relation);
                        xlrec.snapshotConflictHorizon = 
prstate.snapshotConflictHorizon;
                        xlrec.nredirected = prstate.nredirected;
                        xlrec.ndead = prstate.ndead;
diff --git a/src/backend/access/heap/visibilitymap.c 
b/src/backend/access/heap/visibilitymap.c
index 4ed70275e2..0bd73f4d9f 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -283,8 +283,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer 
heapBuf,
                        if (XLogRecPtrIsInvalid(recptr))
                        {
                                Assert(!InRecovery);
-                               recptr = log_heap_visible(rel->rd_locator, 
heapBuf, vmBuf,
-                                                                               
  cutoff_xid, flags);
+                               recptr = log_heap_visible(rel, heapBuf, vmBuf, 
cutoff_xid, flags);
 
                                /*
                                 * If data checksums are enabled (or 
wal_log_hints=on), we
diff --git a/src/backend/access/nbtree/nbtpage.c 
b/src/backend/access/nbtree/nbtpage.c
index 65aa44893c..88773c0e41 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -836,6 +836,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, 
FullTransactionId safexid)
         */
 
        /* XLOG stuff */
+       xlrec_reuse.onCatalogAccessibleInLogicalDecoding = 
IndexIsAccessibleInLogicalDecoding(rel);
        xlrec_reuse.locator = rel->rd_locator;
        xlrec_reuse.block = blkno;
        xlrec_reuse.snapshotConflictHorizon = safexid;
@@ -1358,6 +1359,7 @@ _bt_delitems_delete(Relation rel, Buffer buf,
                XLogRecPtr      recptr;
                xl_btree_delete xlrec_delete;
 
+               xlrec_delete.onCatalogAccessibleInLogicalDecoding = 
IndexIsAccessibleInLogicalDecoding(rel);
                xlrec_delete.snapshotConflictHorizon = snapshotConflictHorizon;
                xlrec_delete.ndeleted = ndeletable;
                xlrec_delete.nupdated = nupdatable;
diff --git a/src/backend/access/spgist/spgvacuum.c 
b/src/backend/access/spgist/spgvacuum.c
index ad90b213b9..be23907687 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -503,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
        spgxlogVacuumRedirect xlrec;
        GlobalVisState *vistest;
 
+       xlrec.onCatalogAccessibleInLogicalDecoding = 
IndexIsAccessibleInLogicalDecoding(index);
        xlrec.nToPlaceholder = 0;
        xlrec.snapshotConflictHorizon = InvalidTransactionId;
 
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 61f1d3926a..f6b2c9ac71 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -123,7 +123,8 @@ static void UpdateIndexRelation(Oid indexoid, Oid heapoid,
                                                                bool 
isexclusion,
                                                                bool immediate,
                                                                bool isvalid,
-                                                               bool isready);
+                                                               bool isready,
+                                                               bool 
is_user_catalog);
 static void index_update_stats(Relation rel,
                                                           bool hasindex,
                                                           double reltuples);
@@ -545,7 +546,8 @@ UpdateIndexRelation(Oid indexoid,
                                        bool isexclusion,
                                        bool immediate,
                                        bool isvalid,
-                                       bool isready)
+                                       bool isready,
+                                       bool is_user_catalog)
 {
        int2vector *indkey;
        oidvector  *indcollation;
@@ -622,6 +624,7 @@ UpdateIndexRelation(Oid indexoid,
        values[Anum_pg_index_indcheckxmin - 1] = BoolGetDatum(false);
        values[Anum_pg_index_indisready - 1] = BoolGetDatum(isready);
        values[Anum_pg_index_indislive - 1] = BoolGetDatum(true);
+       values[Anum_pg_index_indisusercatalog - 1] = 
BoolGetDatum(is_user_catalog);
        values[Anum_pg_index_indisreplident - 1] = BoolGetDatum(false);
        values[Anum_pg_index_indkey - 1] = PointerGetDatum(indkey);
        values[Anum_pg_index_indcollation - 1] = PointerGetDatum(indcollation);
@@ -735,6 +738,7 @@ index_create(Relation heapRelation,
        TransactionId relfrozenxid;
        MultiXactId relminmxid;
        bool            create_storage = !RelFileNumberIsValid(relFileNumber);
+       bool            isusercatalog = false;
 
        /* constraint flags can only be set when a constraint is requested */
        Assert((constr_flags == 0) ||
@@ -1014,13 +1018,17 @@ index_create(Relation heapRelation,
         *        (Or, could define a rule to maintain the predicate) --Nels, 
Feb '92
         * ----------------
         */
+       if (heapRelation->rd_options)
+               isusercatalog = ((StdRdOptions *) 
(heapRelation)->rd_options)->user_catalog_table;
+
        UpdateIndexRelation(indexRelationId, heapRelationId, parentIndexRelid,
                                                indexInfo,
                                                collationObjectId, 
classObjectId, coloptions,
                                                isprimary, is_exclusion,
                                                (constr_flags & 
INDEX_CONSTR_CREATE_DEFERRABLE) == 0,
                                                !concurrent && !invalid,
-                                               !concurrent);
+                                               !concurrent,
+                                               isusercatalog);
 
        /*
         * Register relcache invalidation on the indexes' heap relation, to
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 10c1955884..ceb0f8f8a6 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -103,6 +103,7 @@
 #include "utils/syscache.h"
 #include "utils/timestamp.h"
 #include "utils/typcache.h"
+#include "utils/rel.h"
 
 /*
  * ON COMMIT action list
@@ -14189,6 +14190,10 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
        Datum           repl_val[Natts_pg_class];
        bool            repl_null[Natts_pg_class];
        bool            repl_repl[Natts_pg_class];
+       ListCell   *cell;
+       List       *rel_options;
+       bool            catalog_table_val = HEAP_DEFAULT_USER_CATALOG_TABLE;
+       bool            catalog_table = false;
        static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
 
        if (defList == NIL && operation != AT_ReplaceRelOptions)
@@ -14255,7 +14260,6 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
        {
                Query      *view_query = get_view_query(rel);
                List       *view_options = untransformRelOptions(newOptions);
-               ListCell   *cell;
                bool            check_option = false;
 
                foreach(cell, view_options)
@@ -14283,6 +14287,20 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
                }
        }
 
+       /* If user_catalog_table is part of the new options, record its new 
value */
+       rel_options = untransformRelOptions(newOptions);
+
+       foreach(cell, rel_options)
+       {
+               DefElem    *defel = (DefElem *) lfirst(cell);
+
+               if (strcmp(defel->defname, "user_catalog_table") == 0)
+               {
+                       catalog_table = true;
+                       catalog_table_val = defGetBoolean(defel);
+               }
+       }
+
        /*
         * All we need do here is update the pg_class row; the new options will 
be
         * propagated into relcaches during post-commit cache inval.
@@ -14309,6 +14327,41 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
 
        ReleaseSysCache(tuple);
 
+       /* Update the indexes if there is a need to */
+       if (catalog_table || operation == AT_ResetRelOptions)
+       {
+               Relation        pg_index;
+               HeapTuple       pg_index_tuple;
+               Form_pg_index pg_index_form;
+               ListCell   *index;
+
+               pg_index = table_open(IndexRelationId, RowExclusiveLock);
+
+               foreach(index, RelationGetIndexList(rel))
+               {
+                       Oid                     thisIndexOid = 
lfirst_oid(index);
+
+                       pg_index_tuple = SearchSysCacheCopy1(INDEXRELID,
+                                                                               
                 ObjectIdGetDatum(thisIndexOid));
+                       if (!HeapTupleIsValid(pg_index_tuple))
+                               elog(ERROR, "cache lookup failed for index %u", 
thisIndexOid);
+                       pg_index_form = (Form_pg_index) 
GETSTRUCT(pg_index_tuple);
+
+                       /* Modify the index only if user_catalog_table differ */
+                       if (catalog_table_val != 
pg_index_form->indisusercatalog)
+                       {
+                               pg_index_form->indisusercatalog = 
catalog_table_val;
+                               CatalogTupleUpdate(pg_index, 
&pg_index_tuple->t_self, pg_index_tuple);
+                               InvokeObjectPostAlterHookArg(IndexRelationId, 
thisIndexOid, 0,
+                                                                               
         InvalidOid, true);
+                       }
+
+                       heap_freetuple(pg_index_tuple);
+               }
+
+               table_close(pg_index, RowExclusiveLock);
+       }
+
        /* repeat the whole exercise for the toast table, if there's one */
        if (OidIsValid(rel->rd_rel->reltoastrelid))
        {
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 33f1c7e31b..40c3ea8f71 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -49,6 +49,7 @@ typedef struct gistxlogPageUpdate
  */
 typedef struct gistxlogDelete
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        TransactionId snapshotConflictHorizon;
        uint16          ntodelete;              /* number of deleted offsets */
 
@@ -97,6 +98,7 @@ typedef struct gistxlogPageDelete
  */
 typedef struct gistxlogPageReuse
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        RelFileLocator locator;
        BlockNumber block;
        FullTransactionId snapshotConflictHorizon;
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 6dafb4a598..2eae644e89 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -250,6 +250,7 @@ typedef struct xl_hash_init_bitmap_page
  */
 typedef struct xl_hash_vacuum_one_page
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        TransactionId snapshotConflictHorizon;
        int                     ntuples;
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 5c77290eec..3d814a5ae2 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -242,6 +242,7 @@ typedef struct xl_heap_update
  */
 typedef struct xl_heap_prune
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        TransactionId snapshotConflictHorizon;
        uint16          nredirected;
        uint16          ndead;
@@ -342,6 +343,7 @@ typedef struct xl_heap_freeze_plan
  */
 typedef struct xl_heap_freeze_page
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        TransactionId snapshotConflictHorizon;
        uint16          nplans;
 
@@ -359,6 +361,7 @@ typedef struct xl_heap_freeze_page
  */
 typedef struct xl_heap_visible
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        TransactionId snapshotConflictHorizon;
        uint8           flags;
 } xl_heap_visible;
@@ -408,7 +411,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState 
*record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
-extern XLogRecPtr log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
                                                                   Buffer 
vm_buffer,
                                                                   
TransactionId snapshotConflictHorizon,
                                                                   uint8 
vmflags);
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 3b2d959c69..1931f2bbbc 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -185,6 +185,7 @@ typedef struct xl_btree_dedup
  */
 typedef struct xl_btree_reuse_page
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        RelFileLocator locator;
        BlockNumber block;
        FullTransactionId snapshotConflictHorizon;
@@ -232,6 +233,7 @@ typedef struct xl_btree_vacuum
 
 typedef struct xl_btree_delete
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        TransactionId snapshotConflictHorizon;
        uint16          ndeleted;
        uint16          nupdated;
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index 82332cb694..04fe4a4a52 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot
 
 typedef struct spgxlogVacuumRedirect
 {
+       bool        onCatalogAccessibleInLogicalDecoding;
        uint16          nToPlaceholder; /* number of redirects to make 
placeholders */
        OffsetNumber firstPlaceholder;  /* first placeholder tuple to remove */
        TransactionId snapshotConflictHorizon;  /* newest XID of removed 
redirects */
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index f853846ee1..dd16431378 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -43,6 +43,8 @@ CATALOG(pg_index,2610,IndexRelationId) BKI_SCHEMA_MACRO
        bool            indcheckxmin;   /* must we wait for xmin to be old? */
        bool            indisready;             /* is this index ready for 
inserts? */
        bool            indislive;              /* is this index alive at all? 
*/
+       bool            indisusercatalog;       /* is this index linked to a 
user catalog
+                                                                        * 
relation? */
        bool            indisreplident; /* is this index the identity for 
replication? */
 
        /* variable-length fields start here, but we allow direct access to 
indkey */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index f383a2fca9..9b77e23c29 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -27,6 +27,7 @@
 #include "storage/smgr.h"
 #include "utils/relcache.h"
 #include "utils/reltrigger.h"
+#include "catalog/catalog.h"
 
 
 /*
@@ -343,6 +344,7 @@ typedef struct StdRdOptions
 
 #define HEAP_MIN_FILLFACTOR                    10
 #define HEAP_DEFAULT_FILLFACTOR                100
+#define HEAP_DEFAULT_USER_CATALOG_TABLE                false
 
 /*
  * RelationGetToastTupleTarget
@@ -378,6 +380,9 @@ typedef struct StdRdOptions
  * RelationIsUsedAsCatalogTable
  *             Returns whether the relation should be treated as a catalog 
table
  *             from the pov of logical decoding.  Note multiple eval of 
argument!
+ *             This definition should not invoke anything that performs catalog
+ *             access; otherwise it may cause infinite recursion. Check the 
comments
+ *             in RelationIsAccessibleInLogicalDecoding() for details.
  */
 #define RelationIsUsedAsCatalogTable(relation) \
        ((relation)->rd_options && \
@@ -678,12 +683,41 @@ RelationCloseSmgr(Relation relation)
  * RelationIsAccessibleInLogicalDecoding
  *             True if we need to log enough information to have access via
  *             decoding snapshot.
+ *             This definition should not invoke anything that performs catalog
+ *             access. Otherwise, e.g. logging a WAL entry for catalog 
relation may
+ *             invoke this function, which will in turn do catalog access, 
which may
+ *             in turn cause another similar WAL entry to be logged, leading to
+ *             infinite recursion.
  */
 #define RelationIsAccessibleInLogicalDecoding(relation) \
        (XLogLogicalInfoActive() && \
         RelationNeedsWAL(relation) && \
         (IsCatalogRelation(relation) || 
RelationIsUsedAsCatalogTable(relation)))
 
+/*
+ * IndexIsUserCatalog
+ *             True if index is linked to a user catalog relation.
+ */
+#define IndexIsUserCatalog(relation)                                           
                                        \
+       (AssertMacro(relation->rd_rel->relkind == RELKIND_INDEX),               
                \
+        (relation)->rd_index->indisusercatalog)
+
+/*
+ * IndexIsAccessibleInLogicalDecoding
+ *             True if we need to log enough information to have access via
+ *             decoding snapshot.
+ *             This definition should not invoke anything that performs catalog
+ *             access. Otherwise, e.g. logging a WAL entry for catalog 
relation may
+ *             invoke this function, which will in turn do catalog access, 
which may
+ *             in turn cause another similar WAL entry to be logged, leading to
+ *             infinite recursion.
+ */
+#define IndexIsAccessibleInLogicalDecoding(relation) \
+       (AssertMacro(relation->rd_rel->relkind == RELKIND_INDEX), \
+        XLogLogicalInfoActive() && \
+        RelationNeedsWAL(relation) && \
+        (IsCatalogRelation(relation) || IndexIsUserCatalog(relation)))
+
 /*
  * RelationIsLogicallyLogged
  *             True if we need to log enough information to extract the data 
from the
-- 
2.34.1

Reply via email to