Hi,

On 4/5/23 1:59 PM, Amit Kapila wrote:
On Wed, Apr 5, 2023 at 3:58 PM Amit Kapila <amit.kapil...@gmail.com> wrote:

On Wed, Apr 5, 2023 at 2:41 PM Drouvot, Bertrand
<bertranddrouvot...@gmail.com> wrote:

minor nitpick:
+
+ /* Intentional fall through to session cancel */
+ /* FALLTHROUGH */

Do we need to repeat fall through twice in different ways?


Few minor comments on 0003:
========================
1.
+ case XLOG_PARAMETER_CHANGE:
+ {
+ xl_parameter_change *xlrec =
+ (xl_parameter_change *) XLogRecGetData(buf->record);
+
+ /*
+ * If wal_level on primary is reduced to less than logical,
+ * then we want to prevent existing logical slots from being
+ * used. Existing logical slots on standby get invalidated
+ * when this WAL record is replayed; and further, slot
+ * creation fails when the wal level is not sufficient; but
+ * all these operations are not synchronized, so a logical
+ * slot may creep in while the wal_level is being reduced.
+ * Hence this extra check.
+ */
+ if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on standby requires wal_level to be at
least logical on the primary server")));

By looking at this change, it is not very clear that this can occur
only on standby. I understand that on primary, we will not allow
restarting the server after changing wal_level if there is a
pre-existing slot but still this looks a bit odd. Shall we have an
Assert to indicate that this will occur only on standby?

I think that's a fair point. Adding an Assert and a comment before the
Assert in V61 attached.


2.
/*
- * Since logical decoding is only permitted on a primary server, we know
- * that the current timeline ID can't be changing any more. If we did this
- * on a standby, we'd have to worry about the values we compute here
- * becoming invalid due to a promotion or timeline change.
+ * Since logical decoding is also permitted on a standby server, we need
+ * to check if the server is in recovery to decide how to get the current
+ * timeline ID (so that it also cover the promotion or timeline change
+ * cases).
   */
+
+ /* make sure we have enough WAL available */
+ flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+ /* the standby could have been promoted, so check if still in recovery */
+ am_cascading_walsender = RecoveryInProgress();

The first part of the comment explains why it is important to check
RecoveryInProgress() and then immediately after that, the patch
invokes WalSndWaitForWal(). It may be better to move the comment after
WalSndWaitForWal() invocation.

Good catch, thanks! done in V61.

Also, it will be better to write a
comment as to why you need to do WalSndWaitForWal() before retrieving
the current timeline as previously that was done afterward.


Agree, done in V61.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 1bdbea718682cce4953310314759863302b0c0ea Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 14:08:11 +0000
Subject: [PATCH v61 6/6] Doc changes describing details about logical
 decoding.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/logicaldecoding.sgml | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)
 100.0% doc/src/sgml/

diff --git a/doc/src/sgml/logicaldecoding.sgml 
b/doc/src/sgml/logicaldecoding.sgml
index 4e912b4bd4..3da254ed1f 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,28 @@ postgres=# select * from 
pg_logical_slot_get_changes('regression_slot', NULL, NU
      may consume changes from a slot at any given time.
     </para>
 
+    <para>
+     A logical replication slot can also be created on a hot standby. To 
prevent
+     <command>VACUUM</command> from removing required rows from the system
+     catalogs, <varname>hot_standby_feedback</varname> should be set on the
+     standby. In spite of that, if any required rows get removed, the slot gets
+     invalidated. It's highly recommended to use a physical slot between the 
primary
+     and the standby. Otherwise, hot_standby_feedback will work, but only 
while the
+     connection is alive (for example a node restart would break it). Existing
+     logical slots on standby also get invalidated if wal_level on primary is 
reduced to
+     less than 'logical'.
+    </para>
+
+    <para>
+     For a logical slot to be created, it builds a historic snapshot, for which
+     information of all the currently running transactions is essential. On
+     primary, this information is available, but on standby, this information
+     has to be obtained from primary. So, slot creation may wait for some
+     activity to happen on the primary. If the primary is idle, creating a
+     logical slot on standby may take a noticeable time. One option to speed it
+     is to call the <function>pg_log_standby_snapshot</function> on the 
primary.
+    </para>
+
     <caution>
      <para>
       Replication slots persist across crashes and know nothing about the state
-- 
2.34.1

From 044fa7b75e926153ca665d0ed450bc8352e24b0b Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 09:04:12 +0000
Subject: [PATCH v61 5/6] New TAP test for logical decoding on standby.

In addition to the new TAP test, this commit introduces a new 
pg_log_standby_snapshot()
function.

The idea is to be able to take a snapshot of running transactions and write this
to WAL without requesting for a (costly) checkpoint.

Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/func.sgml                        |  15 +
 src/backend/access/transam/xlogfuncs.c        |  32 +
 src/backend/catalog/system_functions.sql      |   2 +
 src/include/catalog/pg_proc.dat               |   3 +
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  37 +
 src/test/recovery/meson.build                 |   1 +
 .../t/035_standby_logical_decoding.pl         | 705 ++++++++++++++++++
 7 files changed, 795 insertions(+)
   3.1% src/backend/
   4.0% src/test/perl/PostgreSQL/Test/
  89.7% src/test/recovery/t/

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index dc44a74eb2..9253cd1c18 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27032,6 +27032,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * 
ps.setting::int + :offset
         prepared with <xref linkend="sql-prepare-transaction"/>.
        </para></entry>
       </row>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_log_standby_snapshot</primary>
+        </indexterm>
+        <function>pg_log_standby_snapshot</function> ()
+        <returnvalue>pg_lsn</returnvalue>
+       </para>
+       <para>
+        Take a snapshot of running transactions and write this to WAL without
+        having to wait bgwriter or checkpointer to log one. This one is useful 
for
+        logical decoding on standby for which logical slot creation is hanging
+        until such a record is replayed on the standby.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/access/transam/xlogfuncs.c 
b/src/backend/access/transam/xlogfuncs.c
index c07daa874f..481e9a47da 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -38,6 +38,7 @@
 #include "utils/pg_lsn.h"
 #include "utils/timestamp.h"
 #include "utils/tuplestore.h"
+#include "storage/standby.h"
 
 /*
  * Backup-related variables.
@@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS)
        PG_RETURN_LSN(switchpoint);
 }
 
+/*
+ * pg_log_standby_snapshot: call LogStandbySnapshot()
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ */
+Datum
+pg_log_standby_snapshot(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      recptr;
+
+       if (RecoveryInProgress())
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("recovery is in progress"),
+                                errhint("pg_log_standby_snapshot() cannot be 
executed during recovery.")));
+
+       if (!XLogStandbyInfoActive())
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("wal_level is not in desired state"),
+                                errhint("wal_level has to be >= 
WAL_LEVEL_REPLICA.")));
+
+       recptr = LogStandbySnapshot();
+
+       /*
+        * As a convenience, return the WAL location of the last inserted record
+        */
+       PG_RETURN_LSN(recptr);
+}
+
 /*
  * pg_create_restore_point: a named point for restore
  *
diff --git a/src/backend/catalog/system_functions.sql 
b/src/backend/catalog/system_functions.sql
index 83ca893444..b7c65ea37d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) 
FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
+
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bcbae9036d..284138727e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6426,6 +6426,9 @@
 { oid => '2848', descr => 'switch to new wal file',
   proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn',
   proargtypes => '', prosrc => 'pg_switch_wal' },
+{ oid => '9658', descr => 'log details of the current snapshot to WAL',
+  proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 
'pg_lsn',
+  proargtypes => '', prosrc => 'pg_log_standby_snapshot' },
 { oid => '3098', descr => 'create a named restore point',
   proname => 'pg_create_restore_point', provolatile => 'v',
   prorettype => 'pg_lsn', proargtypes => 'text',
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm 
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index a3aef8b5e9..62376de602 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3076,6 +3076,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+       my ($self, $primary, $slot_name, $dbname) = @_;
+       my ($stdout, $stderr);
+
+       my $handle;
+
+       $handle = IPC::Run::start(['pg_recvlogical', '-d', 
$self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, 
'--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+       # Once slot restart_lsn is created, the standby looks for 
xl_running_xacts
+       # WAL record from the restart_lsn onwards. So firstly, wait until the 
slot
+       # restart_lsn is evaluated.
+
+       $self->poll_query_until(
+               'postgres', qq[
+               SELECT restart_lsn IS NOT NULL
+               FROM pg_catalog.pg_replication_slots WHERE slot_name = 
'$slot_name'
+       ]) or die "timed out waiting for logical slot to calculate its 
restart_lsn";
+
+       # Now arrange for the xl_running_xacts record for which pg_recvlogical
+       # is waiting.
+       $primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+
+       $handle->finish();
+
+       is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on 
standby created')
+               or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 59465b97f3..e834ad5e0d 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
       't/034_create_database.pl',
+      't/035_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl 
b/src/test/recovery/t/035_standby_logical_decoding.pl
new file mode 100644
index 0000000000..0822012e9d
--- /dev/null
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -0,0 +1,705 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 67;
+
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, 
$handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $node_cascading_standby = 
PostgreSQL::Test::Cluster->new('cascading_standby');
+my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $res;
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+my $standby_physical_slotname = 'standby_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+       my ($node, $pat, $off) = @_;
+
+       $off = 0 unless defined $off;
+       my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+       return 0 if (length($log) <= $off);
+
+       $log = substr($log, $off);
+
+       return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+       my ($node, $slotname, $check_expr) = @_;
+
+       $node->poll_query_until(
+               'postgres', qq[
+               SELECT $check_expr
+               FROM pg_catalog.pg_replication_slots
+               WHERE slot_name = '$slotname';
+       ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+       my ($node) = @_;
+       $node->create_logical_slot_on_standby($node_primary, 'inactiveslot', 
'testdb');
+       $node->create_logical_slot_on_standby($node_primary, 'activeslot', 
'testdb');
+}
+
+# Drop the logical slots on standby.
+sub drop_logical_slots
+{
+       $node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+       $node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' 
slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+       my ($node, $wait, $to_stdout, $to_stderr) = @_;
+       my $slot_user_handle;
+
+       $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', 
$node->connstr('testdb'), '-S', 'activeslot', '-o', 'include-xids=0', '-o', 
'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, 
'2>', $to_stderr);
+
+       if ($wait)
+       {
+               # make sure activeslot is in use
+               $node->poll_query_until('testdb',
+                       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots 
WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+               ) or die "slot never became active";
+       }
+       return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+       my ($slot_user_handle, $check_stderr) = @_;
+       my $return;
+
+       # our client should've terminated in response to the walsender error
+       $slot_user_handle->finish;
+       $return = $?;
+       cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+       if ($return) {
+               like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+       }
+
+       return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+       my ($slot_user_handle) = @_;
+
+       is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 
'inactiveslot on standby dropped');
+       is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on 
standby dropped');
+
+       check_pg_recvlogical_stderr($slot_user_handle, "conflict with 
recovery");
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub change_hot_standby_feedback_and_wait_for_xmins
+{
+       my ($hsf, $invalidated) = @_;
+
+       $node_standby->append_conf('postgresql.conf',qq[
+       hot_standby_feedback = $hsf
+       ]);
+
+       $node_standby->reload;
+
+       if ($hsf && $invalidated)
+       {
+               # With hot_standby_feedback on, xmin should advance,
+               # but catalog_xmin should still remain NULL since there is no 
logical slot.
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NULL");
+       }
+       elsif ($hsf)
+       {
+               # With hot_standby_feedback on, xmin and catalog_xmin should 
advance.
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+       }
+       else
+       {
+               # Both should be NULL since hs_feedback is off
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NULL AND catalog_xmin IS NULL");
+
+       }
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+       my ($conflicting) = @_;
+
+       if ($conflicting)
+       {
+               $res = $node_standby->safe_psql(
+                               'postgres', qq(
+                                select bool_and(conflicting) from 
pg_replication_slots;));
+
+               is($res, 't',
+                       "Logical slots are reported as conflicting");
+       }
+       else
+       {
+               $res = $node_standby->safe_psql(
+                               'postgres', qq(
+                               select bool_or(conflicting) from 
pg_replication_slots;));
+
+               is($res, 'f',
+                       "Logical slots are reported as non conflicting");
+       }
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$primary_slotname');]);
+
+# Check conflicting is NULL for physical slot
+$res = $node_primary->safe_psql(
+               'postgres', qq[
+                SELECT conflicting is null FROM pg_replication_slots where 
slot_name = '$primary_slotname';]);
+
+is($res, 't',
+       "Physical slot reports conflicting as NULL");
+
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+       $node_primary, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$standby_physical_slotname');]);
+
+#######################
+# Initialize cascading standby node
+#######################
+$node_standby->backup($backup_name);
+$node_cascading_standby->init_from_backup(
+       $node_standby, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_cascading_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$standby_physical_slotname']);
+$node_cascading_standby->start;
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, 
s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $result = $node_standby->safe_psql('testdb',
+       qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+       14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+       "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) 
ORDER BY lsn DESC LIMIT 1;"
+);
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby);
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 
'activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+# drop the logical slots
+drop_logical_slots();
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum full on pg_class with hot_standby_feedback turned off on
+# the standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM full pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating replication slot \"inactiveslot\" because it conflicts with 
recovery"),
+  'inactiveslot slot invalidation is logged with vacuum FULL on pg_class');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating replication slot \"activeslot\" because it conflicts with 
recovery"),
+  'activeslot slot invalidation is logged with vacuum FULL on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1,1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the 
standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating replication slot \"inactiveslot\" because it conflicts with 
recovery", $logstart),
+  'inactiveslot slot invalidation is logged with vacuum on pg_class');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating replication slot \"activeslot\" because it conflicts with 
recovery", $logstart),
+  'activeslot slot invalidation is logged with vacuum on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 2 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a non catalog table
+# Scenario 3: No conflict expected.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# put hot standby feedback to off
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should not trigger a conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO conflict_test(x,y) SELECT s, 
s::text FROM generate_series(1,4) s;]);
+$node_primary->safe_psql('testdb', qq[UPDATE conflict_test set x=1, y=1;]);
+$node_primary->safe_psql('testdb', 'VACUUM conflict_test;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should not be issued
+ok( !find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
+
+ok( !find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is not logged with vacuum on conflict_test');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been 
updated
+# we now still expect 2 conflicts reported as the counter persist across 
reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot not updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as non conflicting in pg_replication_slots
+check_slots_conflicting_status(0);
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 0);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 4: conflict due to on-access pruning.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to trigger an on-access pruning
+# on a relation marked as user_catalog_table.
+change_hot_standby_feedback_and_wait_for_xmins(0,0);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s 
char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating replication slot \"inactiveslot\" because it conflicts with 
recovery", $logstart),
+  'inactiveslot slot invalidation is logged with on-access pruning');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating replication slot \"activeslot\" because it conflicts with 
recovery", $logstart),
+  'activeslot slot invalidation is logged with on-access pruning');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 3 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 5: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating replication slot \"inactiveslot\" because it conflicts with 
recovery", $logstart),
+  'inactiveslot slot invalidation is logged due to wal_level');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating replication slot \"activeslot\" because it conflicts with 
recovery", $logstart),
+  'activeslot slot invalidation is logged due to wal_level');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 3 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 4) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# We are not able to read from the slot as it requires wal_level at least 
logical on the primary server
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires 
wal_level to be at least logical on the primary server");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_replay_catchup($node_standby);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 
'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+is($node_standby->safe_psql('postgres',
+       q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 
'f',
+       'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+       'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_standby->reload;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# create the logical slots on the cascading standby too
+create_logical_slots($node_cascading_standby);
+
+# Make slots actives
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 1, 
\$cascading_stdout, \$cascading_stderr);
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+# Wait for both standbys to catchup
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,7) s;]
+);
+
+# Wait for the cascading standby to catchup
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
promoted standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion
+my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+ok( pump_until(
+        $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($stdout);
+is($stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+# check that we are decoding pre and post promotion inserted rows on the 
cascading standby
+$stdout_sql = $node_cascading_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
cascading standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion on the cascading 
standby
+ok( pump_until(
+        $cascading_handle, $pump_timeout, \$cascading_stdout, 
qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($cascading_stdout);
+is($cascading_stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session on 
cascading standby');
-- 
2.34.1

From df9b447889ffa739a223c7db615708256dfdf32c Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 16:46:09 +0000
Subject: [PATCH v61 4/6] For cascading replication, wake up physical
 walsenders separately from logical walsenders.

Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.

Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.

Author: Bertrand Drouvot per idea from Jeff Davis and Amit Kapila.
Reviewed-By: Sawada Masahiko, Robert Haas.
---
 src/backend/access/transam/xlog.c           |  6 ++--
 src/backend/access/transam/xlogarchive.c    |  2 +-
 src/backend/access/transam/xlogrecovery.c   | 30 ++++++++++++----
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         | 40 +++++++++++++++++----
 src/include/replication/walsender.h         | 22 ++++++------
 src/include/replication/walsender_private.h |  3 ++
 7 files changed, 76 insertions(+), 29 deletions(-)
  36.3% src/backend/access/transam/
  47.3% src/backend/replication/
  16.2% src/include/replication/

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 779f5c3711..70ac8fc33b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
        END_CRIT_SECTION();
 
        /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
        /*
         * If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
        END_CRIT_SECTION();
 
        /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
        /*
         * Great, done. To take some work off the critical path, try to 
initialize
@@ -5773,7 +5773,7 @@ StartupXLOG(void)
         * If there were cascading standby servers connected to us, nudge any 
wal
         * sender processes to notice that we've been promoted.
         */
-       WalSndWakeup();
+       WalSndWakeup(true, true);
 
        /*
         * If this was a promotion, request an (online) checkpoint now. This 
isn't
diff --git a/src/backend/access/transam/xlogarchive.c 
b/src/backend/access/transam/xlogarchive.c
index a0f5aa24b5..f3fb92c8f9 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char 
*xlogfname)
         * if we restored something other than a WAL segment, but it does no 
harm
         * either.
         */
-       WalSndWakeup();
+       WalSndWakeup(true, false);
 }
 
 /*
diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..186e4ef600 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /*
+        * Wakeup walsenders:
+        *
+        * On the standby, the WAL is flushed first (which will only wake up
+        * physical walsenders) and then applied, which will only wake up 
logical
+        * walsenders.
+        *
+        * Indeed, logical walsenders on standby can't decode and send data 
until
+        * it's been applied.
+        *
+        * Physical walsenders don't need to be waked up during replay unless
+        * cascading replication is allowed and time line change occured (so 
that
+        * they can notice that they are on a new time line).
+        *
+        * That's why the wake up conditions are for:
+        *
+        *  - physical walsenders in case of new time line and cascade
+        *  replication is allowed.
+        *  - logical walsenders in case of new time line or recovery is in 
progress
+        *  (logical decoding on standby).
+        */
+       WalSndWakeup(switchedTLI && AllowCascadeReplication(),
+                                switchedTLI || RecoveryInProgress());
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and 
sends
@@ -1958,12 +1982,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
                 */
                RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
 
-               /*
-                * Wake up any walsenders to notice that we are on a new 
timeline.
-                */
-               if (AllowCascadeReplication())
-                       WalSndWakeup();
-
                /* Reset the prefetcher. */
                XLogPrefetchReconfigure();
        }
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index 685af51d5d..d2aa93734c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
                /* Signal the startup process and walsender that new WAL has 
arrived */
                WakeupRecovery();
                if (AllowCascadeReplication())
-                       WalSndWakeup();
+                       WalSndWakeup(true, !RecoveryInProgress());
 
                /* Report XLOG streaming progress in PS display */
                if (update_process_title)
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 2d908d1de2..5c68ebb79e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2628,6 +2628,23 @@ InitWalSenderSlot(void)
                        walsnd->sync_standby_priority = 0;
                        walsnd->latch = &MyProc->procLatch;
                        walsnd->replyTime = 0;
+
+                       /*
+                        * The kind assignment is done here and not in 
StartReplication()
+                        * and StartLogicalReplication(). Indeed, the logical 
walsender
+                        * needs to read WAL records (like snapshot of running
+                        * transactions) during the slot creation. So it needs 
to be woken
+                        * up based on its kind.
+                        *
+                        * The kind assignment could also be done in 
StartReplication(),
+                        * StartLogicalReplication() and 
CREATE_REPLICATION_SLOT but it
+                        * seems better to set it on one place.
+                        */
+                       if (MyDatabaseId == InvalidOid)
+                               walsnd->kind = REPLICATION_KIND_PHYSICAL;
+                       else
+                               walsnd->kind = REPLICATION_KIND_LOGICAL;
+
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        MyWalSnd = (WalSnd *) walsnd;
@@ -3310,30 +3327,39 @@ WalSndShmemInit(void)
 }
 
 /*
- * Wake up all walsenders
+ * Wake up physical, logical or both walsenders kind
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical
+ * walsenders separately from logical walsenders (see the comment before 
calling
+ * WalSndWakeup() in ApplyWalRecord() for more details).
  *
  * This will be called inside critical sections, so throwing an error is not
  * advisable.
  */
 void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
 {
        int                     i;
 
        for (i = 0; i < max_wal_senders; i++)
        {
                Latch      *latch;
+               ReplicationKind kind;
                WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
-               /*
-                * Get latch pointer with spinlock held, for the unlikely case 
that
-                * pointer reads aren't atomic (as they're 8 bytes).
-                */
+               /* get latch pointer and kind with spinlock helds */
                SpinLockAcquire(&walsnd->mutex);
                latch = walsnd->latch;
+               kind = walsnd->kind;
                SpinLockRelease(&walsnd->mutex);
 
-               if (latch != NULL)
+               if (latch != NULL && ((physical && kind == 
REPLICATION_KIND_PHYSICAL) ||
+                                                         (logical && kind == 
REPLICATION_KIND_LOGICAL)))
                        SetLatch(latch);
        }
 }
diff --git a/src/include/replication/walsender.h 
b/src/include/replication/walsender.h
index 52bb3e2aae..9df7e50f94 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
 extern void WalSndInitStopping(void);
 extern void WalSndWaitStopping(void);
 extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
 /*
  * wakeup walsenders if there is work to be done
  */
-#define WalSndWakeupProcessRequests()          \
-       do                                                                      
        \
-       {                                                                       
        \
-               if (wake_wal_senders)                           \
-               {                                                               
        \
-                       wake_wal_senders = false;               \
-                       if (max_wal_senders > 0)                \
-                               WalSndWakeup();                         \
-               }                                                               
        \
-       } while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+       if (wake_wal_senders)
+       {
+               wake_wal_senders = false;
+               if (max_wal_senders > 0)
+                       WalSndWakeup(physical, logical);
+       }
+}
 
 #endif                                                 /* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h 
b/src/include/replication/walsender_private.h
index 5310e054c4..ff25aa70a8 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -15,6 +15,7 @@
 #include "access/xlog.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
+#include "nodes/replnodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
         * Timestamp of the last message received from standby.
         */
        TimestampTz replyTime;
+
+       ReplicationKind kind;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
-- 
2.34.1

From eeafb43e73060500656e4f155e0451e35c0d2b42 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 12:45:20 +0000
Subject: [PATCH v61 3/6] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/replication/logical/decode.c  | 30 +++++++++++-
 src/backend/replication/logical/logical.c | 36 +++++++-------
 src/backend/replication/slot.c            | 58 ++++++++++++-----------
 src/backend/replication/walsender.c       | 48 ++++++++++++-------
 src/include/access/xlog.h                 |  1 +
 6 files changed, 123 insertions(+), 61 deletions(-)
   4.3% src/backend/access/transam/
  38.9% src/backend/replication/logical/
  55.9% src/backend/replication/

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 10085aa0d6..779f5c3711 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset)
        ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+       return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..8352dbf5df 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
                         * can restart from there.
                         */
                        break;
+               case XLOG_PARAMETER_CHANGE:
+                       {
+                               xl_parameter_change *xlrec =
+                               (xl_parameter_change *) 
XLogRecGetData(buf->record);
+
+                               /*
+                                * If wal_level on primary is reduced to less 
than logical,
+                                * then we want to prevent existing logical 
slots from being
+                                * used. Existing logical slots on standby get 
invalidated
+                                * when this WAL record is replayed; and 
further, slot
+                                * creation fails when the wal level is not 
sufficient; but
+                                * all these operations are not synchronized, 
so a logical
+                                * slot may creep in while the wal_level is 
being reduced.
+                                * Hence this extra check.
+                                */
+                               if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+                               {
+                                       /*
+                                        * This can occur only on a standby, as 
a primary would
+                                        * not allow to restart after changing 
wal_level < logical
+                                        * if there is pre-existing logical 
slot.
+                                        */
+                                       Assert(RecoveryInProgress());
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                        errmsg("logical 
decoding on standby requires wal_level to be at least logical on the primary 
server")));
+                               }
+                               break;
+                       }
                case XLOG_NOOP:
                case XLOG_NEXTOID:
                case XLOG_SWITCH:
                case XLOG_BACKUP_END:
-               case XLOG_PARAMETER_CHANGE:
                case XLOG_RESTORE_POINT:
                case XLOG_FPW_CHANGE:
                case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index c3ec97a0a6..60a5008b6d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("logical decoding requires a database 
connection")));
 
-       /* ----
-        * TODO: We got to change that someday soon...
-        *
-        * There's basically three things missing to allow this:
-        * 1) We need to be able to correctly and quickly identify the timeline 
a
-        *        LSN belongs to
-        * 2) We need to force hot_standby_feedback to be enabled at all times 
so
-        *        the primary cannot remove rows we need.
-        * 3) support dropping replication slots referring to a database, in
-        *        dbase_redo. There can't be any active ones due to HS recovery
-        *        conflicts, so that should be relatively easy.
-        * ----
-        */
        if (RecoveryInProgress())
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("logical decoding cannot be used while 
in recovery")));
+       {
+               /*
+                * This check may have race conditions, but whenever
+                * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, 
we
+                * verify that there are no existing logical replication slots. 
And to
+                * avoid races around creating a new slot,
+                * CheckLogicalDecodingRequirements() is called once before 
creating
+                * the slot, and once when logical decoding is initially 
starting up.
+                */
+               if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("logical decoding on standby 
requires wal_level to be at least logical on the primary server")));
+       }
 }
 
 /*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
        LogicalDecodingContext *ctx;
        MemoryContext old_context;
 
+       /*
+        * On standby, this check is also required while creating the slot. 
Check
+        * the comments in this function.
+        */
+       CheckLogicalDecodingRequirements();
+
        /* shorter lines... */
        slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index aacb75bebf..efae9588f3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
 
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -1183,37 +1184,28 @@ ReplicationSlotReserveWal(void)
                /*
                 * For logical slots log a standby snapshot and start logical 
decoding
                 * at exactly that position. That allows the slot to start up 
more
-                * quickly.
+                * quickly. But on a standby we cannot do WAL writes, so just 
use the
+                * replay pointer; effectively, an attempt to create a logical 
slot on
+                * standby will cause it to wait for an xl_running_xact record 
to be
+                * logged independently on the primary, so that a snapshot can 
be
+                * built using the record.
                 *
-                * That's not needed (or indeed helpful) for physical slots as 
they'll
-                * start replay at the last logged checkpoint anyway. Instead 
return
-                * the location of the last redo LSN. While that slightly 
increases
-                * the chance that we have to retry, it's where a base backup 
has to
-                * start replay at.
+                * None of this is needed (or indeed helpful) for physical 
slots as
+                * they'll start replay at the last logged checkpoint anyway. 
Instead
+                * return the location of the last redo LSN. While that slightly
+                * increases the chance that we have to retry, it's where a base
+                * backup has to start replay at.
                 */
-               if (!RecoveryInProgress() && SlotIsLogical(slot))
-               {
-                       XLogRecPtr      flushptr;
-
-                       /* start at current insert position */
+               if (SlotIsPhysical(slot))
+                       restart_lsn = GetRedoRecPtr();
+               else if (RecoveryInProgress())
+                       restart_lsn = GetXLogReplayRecPtr(NULL);
+               else
                        restart_lsn = GetXLogInsertRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-
-                       /* make sure we have enough information to start */
-                       flushptr = LogStandbySnapshot();
 
-                       /* and make sure it's fsynced to disk */
-                       XLogFlush(flushptr);
-               }
-               else
-               {
-                       restart_lsn = GetRedoRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-               }
+               SpinLockAcquire(&slot->mutex);
+               slot->data.restart_lsn = restart_lsn;
+               SpinLockRelease(&slot->mutex);
 
                /* prevent WAL removal as fast as possible */
                ReplicationSlotsComputeRequiredLSN();
@@ -1229,8 +1221,18 @@ ReplicationSlotReserveWal(void)
                if (XLogGetLastRemovedSegno() < segno)
                        break;
        }
-}
 
+       if (!RecoveryInProgress() && SlotIsLogical(slot))
+       {
+               XLogRecPtr      flushptr;
+
+               /* make sure we have enough information to start */
+               flushptr = LogStandbySnapshot();
+
+               /* and make sure it's fsynced to disk */
+               XLogFlush(flushptr);
+       }
+}
 
 /*
  * Report terminating or conflicting message.
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index b686691ca2..2d908d1de2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        int                     count;
        WALReadError errinfo;
        XLogSegNo       segno;
-       TimeLineID      currTLI = GetWALInsertionTimeLine();
+       TimeLineID      currTLI;
+
+       /*
+        * Make sure we have enough WAL available before retrieving the current
+        * timeline. This is needed to determine am_cascading_walsender 
accurately
+        * which is needed to determine the current timeline.
+        */
+       flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
        /*
-        * Since logical decoding is only permitted on a primary server, we know
-        * that the current timeline ID can't be changing any more. If we did 
this
-        * on a standby, we'd have to worry about the values we compute here
-        * becoming invalid due to a promotion or timeline change.
+        * Since logical decoding is also permitted on a standby server, we need
+        * to check if the server is in recovery to decide how to get the 
current
+        * timeline ID (so that it also cover the promotion or timeline change
+        * cases).
         */
+       am_cascading_walsender = RecoveryInProgress();
+
+       if (am_cascading_walsender)
+               GetXLogReplayRecPtr(&currTLI);
+       else
+               currTLI = GetWALInsertionTimeLine();
+
        XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
        sendTimeLineIsHistoric = (state->currTLI != currTLI);
        sendTimeLine = state->currTLI;
        sendTimeLineValidUpto = state->currTLIValidUntil;
        sendTimeLineNextTLI = state->nextTLI;
 
-       /* make sure we have enough WAL available */
-       flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
        /* fail if not (implies we are going to shut down) */
        if (flushptr < targetPagePtr + reqLen)
                return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
                                 cur_page,
                                 targetPagePtr,
                                 XLOG_BLCKSZ,
-                                state->seg.ws_tli, /* Pass the current TLI 
because only
-                                                                        * 
WalSndSegmentOpen controls whether new
-                                                                        * TLI 
is needed. */
+                                currTLI,               /* Pass the current TLI 
because only
+                                                                * 
WalSndSegmentOpen controls whether new TLI
+                                                                * is needed. */
                                 &errinfo))
                WALReadRaiseError(&errinfo);
 
@@ -3073,10 +3084,14 @@ XLogSendLogical(void)
         * If first time through in this session, initialize flushPtr.  
Otherwise,
         * we only need to update flushPtr if EndRecPtr is past it.
         */
-       if (flushPtr == InvalidXLogRecPtr)
-               flushPtr = GetFlushRecPtr(NULL);
-       else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-               flushPtr = GetFlushRecPtr(NULL);
+       if (flushPtr == InvalidXLogRecPtr ||
+               logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+       {
+               if (am_cascading_walsender)
+                       flushPtr = GetStandbyFlushRecPtr(NULL);
+               else
+                       flushPtr = GetFlushRecPtr(NULL);
+       }
 
        /* If EndRecPtr is still past our flushPtr, it means we caught up. */
        if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3167,7 +3182,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
        receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-       *tli = replayTLI;
+       if (tli)
+               *tli = replayTLI;
 
        result = replayPtr;
        if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738..48ca852381 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
-- 
2.34.1

From d238d118b1cabe2b99f4c65fd59cc1957cbb006f Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 11:28:30 +0000
Subject: [PATCH v61 2/6] Arrange for a new pg_stat_database_conflicts and
 pg_replication_slots field

As we handled logical slot conflicts on standby on the previous commit, we
can expose the conflict in pg_stat_database_conflicts and pg_replication_slots.

Adding:

- confl_active_logicalslot in pg_stat_database_conflicts
- conflicting in pg_replication_slots

to do so.
---
 doc/src/sgml/monitoring.sgml                 | 11 +++++++++++
 doc/src/sgml/system-views.sgml               | 10 ++++++++++
 src/backend/catalog/system_views.sql         |  6 ++++--
 src/backend/replication/slotfuncs.c          | 12 +++++++++++-
 src/backend/utils/activity/pgstat_database.c |  4 ++++
 src/backend/utils/adt/pgstatfuncs.c          |  3 +++
 src/include/catalog/pg_proc.dat              | 11 ++++++++---
 src/include/pgstat.h                         |  1 +
 src/test/regress/expected/rules.out          |  8 +++++---
 9 files changed, 57 insertions(+), 9 deletions(-)
  33.7% doc/src/sgml/
   8.1% src/backend/catalog/
  13.1% src/backend/replication/
   5.9% src/backend/utils/activity/
   5.6% src/backend/utils/adt/
  24.6% src/include/catalog/
   6.9% src/test/regress/expected/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index fd0ffbb1e0..9fc585b9e7 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4674,6 +4674,17 @@ SELECT pid, wait_event_type, wait_event FROM 
pg_stat_activity WHERE wait_event i
        deadlocks
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of active logical slots in this database that have been
+       invalidated because they conflict with recovery (note that inactive ones
+       are also invalidated but do not increment this counter)
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index bb1a418450..57b228076e 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        false for physical slots.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>conflicting</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if this logical slot conflicted with recovery (and so is now
+       invalidated). Always NULL for physical slots.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 6b098234f8..c25067d06d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -999,7 +999,8 @@ CREATE VIEW pg_replication_slots AS
             L.confirmed_flush_lsn,
             L.wal_status,
             L.safe_wal_size,
-            L.two_phase
+            L.two_phase,
+            L.conflicting
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
@@ -1067,7 +1068,8 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
-            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS 
confl_active_logicalslot
     FROM pg_database D;
 
 CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 015d276fd9..6473c73eca 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
        ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        XLogRecPtr      currlsn;
        int                     slotno;
@@ -403,6 +403,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
                values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+               if (slot_contents.data.database == InvalidOid)
+                       nulls[i++] = true;
+               else
+               {
+                       if (LogicalReplicationSlotIsInvalid(slot))
+                               values[i++] = BoolGetDatum(true);
+                       else
+                               values[i++] = BoolGetDatum(false);
+               }
+
                Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
                tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/backend/utils/activity/pgstat_database.c 
b/src/backend/utils/activity/pgstat_database.c
index 6e650ceaad..7149f22f72 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
                case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
                        dbentry->conflict_bufferpin++;
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       dbentry->conflict_logicalslot++;
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        dbentry->conflict_startup_deadlock++;
                        break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool 
nowait)
        PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
        PGSTAT_ACCUM_DBCOUNT(conflict_lock);
        PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+       PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
        PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
        PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c 
b/src/backend/utils/adt/pgstatfuncs.c
index eec9f3cf9b..4de60d8aa1 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1066,6 +1066,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
 /* pg_stat_get_db_xact_rollback */
 PG_STAT_GET_DBENTRY_INT64(xact_rollback)
 
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
 
 Datum
 pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1099,6 +1101,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
                result = (int64) (dbentry->conflict_tablespace +
                                                  dbentry->conflict_lock +
                                                  dbentry->conflict_snapshot +
+                                                 dbentry->conflict_logicalslot 
+
                                                  dbentry->conflict_bufferpin +
                                                  
dbentry->conflict_startup_deadlock);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f9f2642201..bcbae9036d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5605,6 +5605,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+  descr => 'statistics: recovery conflicts in database caused by logical 
replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer 
pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
@@ -11071,9 +11076,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => 
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => 
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+  proallargtypes => 
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => 
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 75d258d921..fa3d326d86 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -331,6 +331,7 @@ typedef struct PgStat_StatDBEntry
        PgStat_Counter conflict_tablespace;
        PgStat_Counter conflict_lock;
        PgStat_Counter conflict_snapshot;
+       PgStat_Counter conflict_logicalslot;
        PgStat_Counter conflict_bufferpin;
        PgStat_Counter conflict_startup_deadlock;
        PgStat_Counter temp_files;
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index ab1aebfde4..06d3f1f5d3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.confirmed_flush_lsn,
     l.wal_status,
     l.safe_wal_size,
-    l.two_phase
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, 
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, 
confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+    l.two_phase,
+    l.conflicting
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, 
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, 
confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
@@ -1869,7 +1870,8 @@ pg_stat_database_conflicts| SELECT oid AS datid,
     pg_stat_get_db_conflict_lock(oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot,
     pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin,
-    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock
+    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock,
+    pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot
    FROM pg_database d;
 pg_stat_gssapi| SELECT pid,
     gss_auth AS gss_authenticated,
-- 
2.34.1

From 13c65864c30cd9c57fa6204708543fcfb8435c69 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 08:57:56 +0000
Subject: [PATCH v61 1/6] Handle logical slot conflicts on standby.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

During WAL replay on standby, when slot conflict is identified,
invalidate such slots. Also do the same thing if wal_level on the primary server
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello,
Bharath Rupireddy, Amit Kapila, Álvaro Herrera
---
 src/backend/access/gist/gistxlog.c            |   2 +
 src/backend/access/hash/hash_xlog.c           |   1 +
 src/backend/access/heap/heapam.c              |   3 +
 src/backend/access/nbtree/nbtxlog.c           |   2 +
 src/backend/access/spgist/spgxlog.c           |   1 +
 src/backend/access/transam/xlog.c             |  21 +-
 .../replication/logical/logicalfuncs.c        |  13 +-
 src/backend/replication/slot.c                | 189 ++++++++++++++----
 src/backend/replication/slotfuncs.c           |   3 +-
 src/backend/replication/walsender.c           |   7 +
 src/backend/storage/ipc/procsignal.c          |   3 +
 src/backend/storage/ipc/standby.c             |  13 +-
 src/backend/tcop/postgres.c                   |   9 +
 src/include/replication/slot.h                |  55 ++++-
 src/include/storage/procsignal.h              |   1 +
 src/include/storage/standby.h                 |   2 +
 16 files changed, 274 insertions(+), 51 deletions(-)
   7.5% src/backend/access/transam/
   5.7% src/backend/replication/logical/
  61.6% src/backend/replication/
   5.3% src/backend/storage/ipc/
   5.0% src/backend/
  13.8% src/include/replication/

diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index b7678f3c14..9a86fb3fef 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        rlocator);
        }
 
@@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->isCatalogRel,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/hash/hash_xlog.c 
b/src/backend/access/hash/hash_xlog.c
index f2dd9be8d3..e8e06c62a9 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f7d9ce59a4..371e855683 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8717,6 +8717,7 @@ heap_xlog_prune(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
 
        /*
@@ -8888,6 +8889,7 @@ heap_xlog_visible(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
                                                                                
        rlocator);
 
        /*
@@ -9009,6 +9011,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/nbtree/nbtxlog.c 
b/src/backend/access/nbtree/nbtxlog.c
index 414ca4f6de..c87e46ed66 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
        }
 
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->isCatalogRel,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/spgist/spgxlog.c 
b/src/backend/access/spgist/spgxlog.c
index b071b59c8a..459ac929ba 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        locator);
        }
 
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 46821ad605..10085aa0d6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,7 @@ CreateCheckPoint(int flags)
         */
        XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
        KeepLogSeg(recptr, &_logSegNo);
-       if (InvalidateObsoleteReplicationSlots(_logSegNo))
+       if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, NULL))
        {
                /*
                 * Some slots have been invalidated; recalculate the old-segment
@@ -7250,7 +7250,7 @@ CreateRestartPoint(int flags)
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
        endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
        KeepLogSeg(endptr, &_logSegNo);
-       if (InvalidateObsoleteReplicationSlots(_logSegNo))
+       if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, NULL))
        {
                /*
                 * Some slots have been invalidated; recalculate the old-segment
@@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record)
                /* Update our copy of the parameters in pg_control */
                memcpy(&xlrec, XLogRecGetData(record), 
sizeof(xl_parameter_change));
 
+               /*
+                * Invalidate logical slots if we are in hot standby and the 
primary
+                * does not have a WAL level sufficient for logical decoding. 
No need
+                * to search for potentially conflicting logically slots if 
standby is
+                * running with wal_level lower than logical, because in that 
case, we
+                * would have either disallowed creation of logical slots or
+                * invalidated existing ones.
+                */
+               if (InRecovery && InHotStandby &&
+                       xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+                       wal_level >= WAL_LEVEL_LOGICAL)
+               {
+                       TransactionId ConflictHorizon = InvalidTransactionId;
+
+                       InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, 
InvalidOid, &ConflictHorizon);
+               }
+
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->MaxConnections = xlrec.MaxConnections;
                ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/replication/logical/logicalfuncs.c 
b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..575a047e53 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -216,9 +216,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
 
                /*
                 * After the sanity checks in CreateDecodingContext, make sure 
the
-                * restart_lsn is valid.  Avoid "cannot get changes" wording in 
this
-                * errmsg because that'd be confusingly ambiguous about no 
changes
-                * being available.
+                * restart_lsn is valid or both effective_xmin and catalog_xmin 
are
+                * valid. Avoid "cannot get changes" wording in this errmsg 
because
+                * that'd be confusingly ambiguous about no changes being 
available.
                 */
                if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                        ereport(ERROR,
@@ -227,6 +227,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
                                                        NameStr(*name)),
                                         errdetail("This slot has never 
previously reserved WAL, or it has been invalidated.")));
 
+               if (LogicalReplicationSlotIsInvalid(MyReplicationSlot))
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("cannot read from logical 
replication slot \"%s\"",
+                                                       NameStr(*name)),
+                                        errdetail("This slot has been 
invalidated because it was conflicting with recovery.")));
+
                MemoryContextSwitchTo(oldcontext);
 
                /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2293c0c6fc..aacb75bebf 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -110,6 +110,13 @@ static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
 
+/* to report termination/invalidation */
+static void ReportTerminationInvalidation(bool terminating, bool check_on_xid,
+                                                                               
  int pid, NameData slotname,
+                                                                               
  TransactionId *xid,
+                                                                               
  XLogRecPtr restart_lsn,
+                                                                               
  XLogRecPtr oldestLSN);
+
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
  */
@@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
                SpinLockAcquire(&s->mutex);
                effective_xmin = s->effective_xmin;
                effective_catalog_xmin = s->effective_catalog_xmin;
-               invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-                                          
XLogRecPtrIsInvalid(s->data.restart_lsn));
+               invalidated = ObsoleteSlotIsInvalid(s, true) || 
LogicalReplicationSlotIsInvalid(s);
                SpinLockRelease(&s->mutex);
 
                /* invalidated slots need not apply */
@@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void)
        }
 }
 
+
+/*
+ * Report terminating or conflicting message.
+ *
+ * For both, logical conflict on standby and obsolete slot are handled.
+ */
+static void
+ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid,
+                                                         NameData slotname, 
TransactionId *xid,
+                                                         XLogRecPtr 
restart_lsn, XLogRecPtr oldestLSN)
+{
+       StringInfoData err_msg;
+       StringInfoData err_detail;
+       bool            hint = false;
+
+       initStringInfo(&err_detail);
+
+       if (check_on_xid)
+       {
+               if (!terminating)
+               {
+                       initStringInfo(&err_msg);
+                       appendStringInfo(&err_msg, _("invalidating replication 
slot \"%s\" because it conflicts with recovery"),
+                                                        NameStr(slotname));
+               }
+
+               if (TransactionIdIsValid(*xid))
+                       appendStringInfo(&err_detail, _("The slot conflicted 
with xid horizon %u."), *xid);
+               else
+                       appendStringInfo(&err_detail, _("Logical decoding on 
standby requires wal_level to be at least logical on the primary server"));
+       }
+       else
+       {
+               if (!terminating)
+               {
+                       initStringInfo(&err_msg);
+                       appendStringInfo(&err_msg, _("invalidating obsolete 
replication slot \"%s\""),
+                                                        NameStr(slotname));
+               }
+
+               appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X 
exceeds the limit by %llu bytes."),
+                                                LSN_FORMAT_ARGS(restart_lsn),
+                                                (unsigned long long) 
(oldestLSN - restart_lsn));
+
+               hint = true;
+       }
+
+       ereport(LOG,
+                       terminating ? errmsg("terminating process %d to release 
replication slot \"%s\"", pid, NameStr(slotname)) :
+                       errmsg_internal("%s", err_msg.data),
+                       errdetail_internal("%s", err_detail.data),
+                       hint ? errhint("You might need to increase 
max_slot_wal_keep_size.") : 0);
+
+       if (!terminating)
+               pfree(err_msg.data);
+
+       pfree(err_detail.data);
+}
+
 /*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
  *
- * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
+ * Sets *invalidated true if an obsolete slot was invalidated. (Untouched 
otherwise.)
  *
  * This is inherently racy, because we release the LWLock
  * for syscalls, so caller must restart if we return true.
  */
 static bool
 InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
-                                                          bool *invalidated)
+                                                          bool *invalidated, 
TransactionId *xid)
 {
        int                     last_signaled_pid = 0;
        bool            released_lock = false;
+       bool            check_on_xid;
+
+       check_on_xid = xid ? true : false;
 
        for (;;)
        {
                XLogRecPtr      restart_lsn;
+
                NameData        slotname;
                int                     active_pid = 0;
 
@@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                 * Check if the slot needs to be invalidated. If it needs to be
                 * invalidated, and is not currently acquired, acquire it and 
mark it
                 * as having been invalidated.  We do this with the spinlock 
held to
-                * avoid race conditions -- for example the restart_lsn could 
move
-                * forward, or the slot could be dropped.
+                * avoid race conditions -- for example the restart_lsn (or the
+                * xmin(s) could) move forward or the slot could be dropped.
                 */
                SpinLockAcquire(&s->mutex);
 
                restart_lsn = s->data.restart_lsn;
 
                /*
-                * If the slot is already invalid or is fresh enough, we don't 
need to
-                * do anything.
+                * If the slot is already invalid or is a non conflicting slot, 
we
+                * don't need to do anything.
                 */
-               if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= 
oldestLSN)
+               if (DoNotInvalidateSlot(s, xid, &oldestLSN))
                {
+                       /* then, we are not forcing for invalidation */
                        SpinLockRelease(&s->mutex);
                        if (released_lock)
                                LWLockRelease(ReplicationSlotControlLock);
@@ -1294,9 +1365,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                {
                        MyReplicationSlot = s;
                        s->active_pid = MyProcPid;
-                       s->data.invalidated_at = restart_lsn;
-                       s->data.restart_lsn = InvalidXLogRecPtr;
-
+                       if (xid)
+                       {
+                               s->effective_xmin = InvalidTransactionId;
+                               s->data.catalog_xmin = InvalidTransactionId;
+                       }
+                       else
+                       {
+                               s->data.invalidated_at = restart_lsn;
+                               s->data.restart_lsn = InvalidXLogRecPtr;
+                       }
                        /* Let caller know */
                        *invalidated = true;
                }
@@ -1329,15 +1407,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                         */
                        if (last_signaled_pid != active_pid)
                        {
-                               ereport(LOG,
-                                               errmsg("terminating process %d 
to release replication slot \"%s\"",
-                                                          active_pid, 
NameStr(slotname)),
-                                               errdetail("The slot's 
restart_lsn %X/%X exceeds the limit by %llu bytes.",
-                                                                 
LSN_FORMAT_ARGS(restart_lsn),
-                                                                 (unsigned 
long long) (oldestLSN - restart_lsn)),
-                                               errhint("You might need to 
increase max_slot_wal_keep_size."));
-
-                               (void) kill(active_pid, SIGTERM);
+                               ReportTerminationInvalidation(true, 
check_on_xid, active_pid,
+                                                                               
          slotname, xid, restart_lsn,
+                                                                               
          oldestLSN);
+
+                               if (check_on_xid)
+                                       (void) SendProcSignal(active_pid, 
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
+                               else
+                                       (void) kill(active_pid, SIGTERM);
+
                                last_signaled_pid = active_pid;
                        }
 
@@ -1370,14 +1448,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                        ReplicationSlotMarkDirty();
                        ReplicationSlotSave();
                        ReplicationSlotRelease();
+                       pgstat_drop_replslot(s);
 
-                       ereport(LOG,
-                                       errmsg("invalidating obsolete 
replication slot \"%s\"",
-                                                  NameStr(slotname)),
-                                       errdetail("The slot's restart_lsn %X/%X 
exceeds the limit by %llu bytes.",
-                                                         
LSN_FORMAT_ARGS(restart_lsn),
-                                                         (unsigned long long) 
(oldestLSN - restart_lsn)),
-                                       errhint("You might need to increase 
max_slot_wal_keep_size."));
+                       ReportTerminationInvalidation(false, check_on_xid, 
active_pid,
+                                                                               
  slotname, xid, restart_lsn,
+                                                                               
  oldestLSN);
 
                        /* done with this slot for now */
                        break;
@@ -1390,20 +1465,36 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate Obsolete slots or resolve recovery conflicts with logical slots.
  *
- * Returns true when any slot have got invalidated.
+ * Obsolete case (aka xid is NULL):
  *
- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ *      Mark any slot that points to an LSN older than the given segment
+ *      as invalid; it requires WAL that's about to be removed.
+ *      invalidated is set to true when any slot have got invalidated.
+ *
+ *  Logical replication slot case:
+ *
+ *      When xid is valid, it means that we are about to remove rows older 
than xid.
+ *      Therefore we need to invalidate slots that depend on seeing those rows.
+ *      When xid is invalid, invalidate all logical slots. This is required 
when the
+ *      master wal_level is set back to replica, so existing logical slots 
need to
+ *      be invalidated.
  */
 bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid, 
TransactionId *xid)
 {
-       XLogRecPtr      oldestLSN;
+
+       XLogRecPtr      oldestLSN = InvalidXLogRecPtr;
        bool            invalidated = false;
 
-       XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+       Assert(max_replication_slots >= 0);
+
+       if (max_replication_slots == 0)
+               return invalidated;
+
+       if (!xid)
+               XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, 
oldestLSN);
 
 restart:
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1414,21 +1505,35 @@ restart:
                if (!s->in_use)
                        continue;
 
-               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+               if (xid)
                {
-                       /* if the lock was released, start from scratch */
-                       goto restart;
+                       /* we are only dealing with *logical* slot conflicts */
+                       if (!SlotIsLogical(s))
+                               continue;
+
+                       /*
+                        * not the database of interest and we don't want all 
the
+                        * database, skip
+                        */
+                       if (s->data.database != dboid && 
TransactionIdIsValid(*xid))
+                               continue;
                }
+
+               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated, 
xid))
+                       goto restart;
        }
+
        LWLockRelease(ReplicationSlotControlLock);
 
        /*
-        * If any slots have been invalidated, recalculate the resource limits.
+        * If any slots have been invalidated, recalculate the required xmin and
+        * the required lsn (if appropriate).
         */
        if (invalidated)
        {
                ReplicationSlotsComputeRequiredXmin(false);
-               ReplicationSlotsComputeRequiredLSN();
+               if (!xid)
+                       ReplicationSlotsComputeRequiredLSN();
        }
 
        return invalidated;
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 2f3c964824..015d276fd9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -319,8 +319,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                 * certain that the slot has been invalidated.  Otherwise, test
                 * availability from restart_lsn.
                 */
-               if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
-                       !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+               if (ObsoleteSlotIsInvalid(slot, true))
                        walstate = WALAVAIL_REMOVED;
                else
                        walstate = 
GetWALAvailability(slot_contents.data.restart_lsn);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 75e8363e24..b686691ca2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,6 +1253,13 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        ReplicationSlotAcquire(cmd->slotname, true);
 
+       if (LogicalReplicationSlotIsInvalid(MyReplicationSlot))
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("cannot read from logical replication 
slot \"%s\"",
+                                               cmd->slotname),
+                                errdetail("This slot has been invalidated 
because it was conflicting with recovery.")));
+
        if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                ereport(ERROR,
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/storage/ipc/procsignal.c 
b/src/backend/storage/ipc/procsignal.c
index 395b2cf690..c85cb5cc18 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
                RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+       if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+               
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
                
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c 
b/src/backend/storage/ipc/standby.c
index 9f56b4e95c..c62245afc7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId 
*waitlist,
  */
 void
 ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+                                                                       bool 
isCatalogRel,
                                                                        
RelFileLocator locator)
 {
        VirtualTransactionId *backends;
@@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
                                                                                
   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   true);
+
+       if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+               InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, 
locator.dbOid, &snapshotConflictHorizon);
 }
 
 /*
@@ -499,6 +504,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
  */
 void
 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
   bool isCatalogRel,
                                                                                
   RelFileLocator locator)
 {
        /*
@@ -517,7 +523,9 @@ 
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
                TransactionId truncated;
 
                truncated = XidFromFullTransactionId(snapshotConflictHorizon);
-               ResolveRecoveryConflictWithSnapshot(truncated, locator);
+               ResolveRecoveryConflictWithSnapshot(truncated,
+                                                                               
        isCatalogRel,
+                                                                               
        locator);
        }
 }
 
@@ -1478,6 +1486,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        reasonDesc = _("recovery conflict on snapshot");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       reasonDesc = _("recovery conflict on replication slot");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        reasonDesc = _("recovery conflict on buffer deadlock");
                        break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50..25e0de4e0f 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        errdetail("User query might have needed to see row 
versions that must be removed.");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       errdetail("User was using the logical slot that must be 
dropped.");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        errdetail("User transaction caused buffer deadlock with 
recovery.");
                        break;
@@ -3143,6 +3146,12 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
                                InterruptPending = true;
                                break;
 
+                       case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                               RecoveryConflictPending = true;
+                               QueryCancelPending = true;
+                               InterruptPending = true;
+                               break;
+
                        default:
                                elog(FATAL, "unrecognized conflict mode: %d",
                                         (int) reason);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8872c80cdf..914b6aebc3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -168,6 +168,58 @@ typedef struct ReplicationSlot
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
 #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
 
+static inline bool
+ObsoleteSlotIsInvalid(ReplicationSlot *s, bool check_invalidated_at)
+{
+       if (check_invalidated_at)
+               return (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
+                               XLogRecPtrIsInvalid(s->data.restart_lsn));
+       else
+               return (XLogRecPtrIsInvalid(s->data.restart_lsn));
+}
+
+static inline bool
+LogicalReplicationSlotIsInvalid(ReplicationSlot *s)
+{
+       return (!TransactionIdIsValid(s->effective_xmin) &&
+                       !TransactionIdIsValid(s->data.catalog_xmin));
+}
+
+static inline bool
+LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
+{
+       TransactionId slot_effective_xmin;
+       TransactionId slot_catalog_xmin;
+
+       slot_effective_xmin = s->effective_xmin;
+       slot_catalog_xmin = s->data.catalog_xmin;
+
+       return (((TransactionIdIsValid(slot_effective_xmin) && 
TransactionIdPrecedesOrEquals(slot_effective_xmin, xid)) ||
+                        (TransactionIdIsValid(slot_catalog_xmin) && 
TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))));
+}
+
+static inline bool
+SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+       return (s->data.restart_lsn >= oldestLSN);
+}
+
+static inline bool
+LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid)
+{
+       return (TransactionIdIsValid(*xid) && 
!LogicalReplicationSlotXidsConflict(s, *xid));
+}
+
+static inline bool
+DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr 
*oldestLSN)
+{
+       if (xid)
+               return (LogicalReplicationSlotIsInvalid(s) || 
LogicalSlotIsNotConflicting(s, xid));
+       else
+               return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, 
*oldestLSN));
+
+}
+
 /*
  * Shared memory control area for all of replication slots.
  */
@@ -215,7 +267,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid 
dboid, TransactionId *xid);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool 
need_lock);
 extern int     ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
@@ -227,5 +279,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId 
xid, char *reason);
 
 #endif                                                 /* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 905af2231b..2f52100b00 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -42,6 +42,7 @@ typedef enum
        PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
        PROCSIG_RECOVERY_CONFLICT_LOCK,
        PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+       PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
        PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
        PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2effdea126..41f4dc372e 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
+                                                                               
                bool isCatalogRel,
                                                                                
                RelFileLocator locator);
 extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
                           bool isCatalogRel,
                                                                                
                           RelFileLocator locator);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
-- 
2.34.1

Reply via email to