Hi,

On 4/3/23 8:10 AM, Drouvot, Bertrand wrote:
Hi,

On 4/3/23 7:35 AM, Amit Kapila wrote:
On Mon, Apr 3, 2023 at 4:26 AM Jeff Davis <pg...@j-davis.com> wrote:

Agreed, even Bertrand and myself discussed the same approach few
emails above. BTW, if we have this selective logic to wake
physical/logical walsenders and for standby's, we only wake logical
walsenders at the time of  ApplyWalRecord() then do we need the new
conditional variable enhancement being discussed, and if so, why?


Thank you both for this new idea and discussion. In that case I don't think
we need the new CV API and the use of a CV anymore. As just said up-thread I'll 
submit
a new proposal with this new approach.


Please find enclosed V57 implementing the new approach in 0004. With the new 
approach in place
the TAP tests (0005) work like a charm (no delay and even after a promotion).

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 0d198484e008090a524562076326054be56935ca Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 14:08:11 +0000
Subject: [PATCH v57 6/6] Doc changes describing details about logical
 decoding.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/logicaldecoding.sgml | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)
 100.0% doc/src/sgml/

diff --git a/doc/src/sgml/logicaldecoding.sgml 
b/doc/src/sgml/logicaldecoding.sgml
index 4e912b4bd4..3da254ed1f 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,28 @@ postgres=# select * from 
pg_logical_slot_get_changes('regression_slot', NULL, NU
      may consume changes from a slot at any given time.
     </para>
 
+    <para>
+     A logical replication slot can also be created on a hot standby. To 
prevent
+     <command>VACUUM</command> from removing required rows from the system
+     catalogs, <varname>hot_standby_feedback</varname> should be set on the
+     standby. In spite of that, if any required rows get removed, the slot gets
+     invalidated. It's highly recommended to use a physical slot between the 
primary
+     and the standby. Otherwise, hot_standby_feedback will work, but only 
while the
+     connection is alive (for example a node restart would break it). Existing
+     logical slots on standby also get invalidated if wal_level on primary is 
reduced to
+     less than 'logical'.
+    </para>
+
+    <para>
+     For a logical slot to be created, it builds a historic snapshot, for which
+     information of all the currently running transactions is essential. On
+     primary, this information is available, but on standby, this information
+     has to be obtained from primary. So, slot creation may wait for some
+     activity to happen on the primary. If the primary is idle, creating a
+     logical slot on standby may take a noticeable time. One option to speed it
+     is to call the <function>pg_log_standby_snapshot</function> on the 
primary.
+    </para>
+
     <caution>
      <para>
       Replication slots persist across crashes and know nothing about the state
-- 
2.34.1

From 4e392c06e39f36c4185780a21f2b90c7c6a97de4 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 09:04:12 +0000
Subject: [PATCH v57 5/6] New TAP test for logical decoding on standby.

In addition to the new TAP test, this commit introduces a new 
pg_log_standby_snapshot()
function.

The idea is to be able to take a snapshot of running transactions and write this
to WAL without requesting for a (costly) checkpoint.

Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/func.sgml                        |  15 +
 src/backend/access/transam/xlogfuncs.c        |  32 +
 src/backend/catalog/system_functions.sql      |   2 +
 src/include/catalog/pg_proc.dat               |   3 +
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  37 +
 src/test/recovery/meson.build                 |   1 +
 .../t/035_standby_logical_decoding.pl         | 705 ++++++++++++++++++
 7 files changed, 795 insertions(+)
   3.1% src/backend/
   4.0% src/test/perl/PostgreSQL/Test/
  89.7% src/test/recovery/t/

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 918a492234..939fb8019f 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27034,6 +27034,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * 
ps.setting::int + :offset
         prepared with <xref linkend="sql-prepare-transaction"/>.
        </para></entry>
       </row>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_log_standby_snapshot</primary>
+        </indexterm>
+        <function>pg_log_standby_snapshot</function> ()
+        <returnvalue>pg_lsn</returnvalue>
+       </para>
+       <para>
+        Take a snapshot of running transactions and write this to WAL without
+        having to wait bgwriter or checkpointer to log one. This one is useful 
for
+        logical decoding on standby for which logical slot creation is hanging
+        until such a record is replayed on the standby.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/access/transam/xlogfuncs.c 
b/src/backend/access/transam/xlogfuncs.c
index c07daa874f..481e9a47da 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -38,6 +38,7 @@
 #include "utils/pg_lsn.h"
 #include "utils/timestamp.h"
 #include "utils/tuplestore.h"
+#include "storage/standby.h"
 
 /*
  * Backup-related variables.
@@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS)
        PG_RETURN_LSN(switchpoint);
 }
 
+/*
+ * pg_log_standby_snapshot: call LogStandbySnapshot()
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ */
+Datum
+pg_log_standby_snapshot(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      recptr;
+
+       if (RecoveryInProgress())
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("recovery is in progress"),
+                                errhint("pg_log_standby_snapshot() cannot be 
executed during recovery.")));
+
+       if (!XLogStandbyInfoActive())
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("wal_level is not in desired state"),
+                                errhint("wal_level has to be >= 
WAL_LEVEL_REPLICA.")));
+
+       recptr = LogStandbySnapshot();
+
+       /*
+        * As a convenience, return the WAL location of the last inserted record
+        */
+       PG_RETURN_LSN(recptr);
+}
+
 /*
  * pg_create_restore_point: a named point for restore
  *
diff --git a/src/backend/catalog/system_functions.sql 
b/src/backend/catalog/system_functions.sql
index 83ca893444..b7c65ea37d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) 
FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
+
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bcbae9036d..284138727e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6426,6 +6426,9 @@
 { oid => '2848', descr => 'switch to new wal file',
   proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn',
   proargtypes => '', prosrc => 'pg_switch_wal' },
+{ oid => '9658', descr => 'log details of the current snapshot to WAL',
+  proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 
'pg_lsn',
+  proargtypes => '', prosrc => 'pg_log_standby_snapshot' },
 { oid => '3098', descr => 'create a named restore point',
   proname => 'pg_create_restore_point', provolatile => 'v',
   prorettype => 'pg_lsn', proargtypes => 'text',
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm 
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index a3aef8b5e9..62376de602 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3076,6 +3076,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+       my ($self, $primary, $slot_name, $dbname) = @_;
+       my ($stdout, $stderr);
+
+       my $handle;
+
+       $handle = IPC::Run::start(['pg_recvlogical', '-d', 
$self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, 
'--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+       # Once slot restart_lsn is created, the standby looks for 
xl_running_xacts
+       # WAL record from the restart_lsn onwards. So firstly, wait until the 
slot
+       # restart_lsn is evaluated.
+
+       $self->poll_query_until(
+               'postgres', qq[
+               SELECT restart_lsn IS NOT NULL
+               FROM pg_catalog.pg_replication_slots WHERE slot_name = 
'$slot_name'
+       ]) or die "timed out waiting for logical slot to calculate its 
restart_lsn";
+
+       # Now arrange for the xl_running_xacts record for which pg_recvlogical
+       # is waiting.
+       $primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+
+       $handle->finish();
+
+       is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on 
standby created')
+               or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 59465b97f3..e834ad5e0d 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
       't/034_create_database.pl',
+      't/035_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl 
b/src/test/recovery/t/035_standby_logical_decoding.pl
new file mode 100644
index 0000000000..98bc85c140
--- /dev/null
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -0,0 +1,705 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 67;
+
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, 
$handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $node_cascading_standby = 
PostgreSQL::Test::Cluster->new('cascading_standby');
+my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $res;
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+my $standby_physical_slotname = 'standby_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+       my ($node, $pat, $off) = @_;
+
+       $off = 0 unless defined $off;
+       my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+       return 0 if (length($log) <= $off);
+
+       $log = substr($log, $off);
+
+       return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+       my ($node, $slotname, $check_expr) = @_;
+
+       $node->poll_query_until(
+               'postgres', qq[
+               SELECT $check_expr
+               FROM pg_catalog.pg_replication_slots
+               WHERE slot_name = '$slotname';
+       ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+       my ($node) = @_;
+       $node->create_logical_slot_on_standby($node_primary, 'inactiveslot', 
'testdb');
+       $node->create_logical_slot_on_standby($node_primary, 'activeslot', 
'testdb');
+}
+
+# Drop the logical slots on standby.
+sub drop_logical_slots
+{
+       $node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+       $node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' 
slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+       my ($node, $wait, $to_stdout, $to_stderr) = @_;
+       my $slot_user_handle;
+
+       $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', 
$node->connstr('testdb'), '-S', 'activeslot', '-o', 'include-xids=0', '-o', 
'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, 
'2>', $to_stderr);
+
+       if ($wait)
+       {
+               # make sure activeslot is in use
+               $node->poll_query_until('testdb',
+                       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots 
WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+               ) or die "slot never became active";
+       }
+       return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+       my ($slot_user_handle, $check_stderr) = @_;
+       my $return;
+
+       # our client should've terminated in response to the walsender error
+       $slot_user_handle->finish;
+       $return = $?;
+       cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+       if ($return) {
+               like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+       }
+
+       return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+       my ($slot_user_handle) = @_;
+
+       is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 
'inactiveslot on standby dropped');
+       is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on 
standby dropped');
+
+       check_pg_recvlogical_stderr($slot_user_handle, "conflict with 
recovery");
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub change_hot_standby_feedback_and_wait_for_xmins
+{
+       my ($hsf, $invalidated) = @_;
+
+       $node_standby->append_conf('postgresql.conf',qq[
+       hot_standby_feedback = $hsf
+       ]);
+
+       $node_standby->reload;
+
+       if ($hsf && $invalidated)
+       {
+               # With hot_standby_feedback on, xmin should advance,
+               # but catalog_xmin should still remain NULL since there is no 
logical slot.
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NULL");
+       }
+       elsif ($hsf)
+       {
+               # With hot_standby_feedback on, xmin and catalog_xmin should 
advance.
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+       }
+       else
+       {
+               # Both should be NULL since hs_feedback is off
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NULL AND catalog_xmin IS NULL");
+
+       }
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+       my ($conflicting) = @_;
+
+       if ($conflicting)
+       {
+               $res = $node_standby->safe_psql(
+                               'postgres', qq(
+                                select bool_and(conflicting) from 
pg_replication_slots;));
+
+               is($res, 't',
+                       "Logical slots are reported as conflicting");
+       }
+       else
+       {
+               $res = $node_standby->safe_psql(
+                               'postgres', qq(
+                               select bool_or(conflicting) from 
pg_replication_slots;));
+
+               is($res, 'f',
+                       "Logical slots are reported as non conflicting");
+       }
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$primary_slotname');]);
+
+# Check conflicting is NULL for physical slot
+$res = $node_primary->safe_psql(
+               'postgres', qq[
+                SELECT conflicting is null FROM pg_replication_slots where 
slot_name = '$primary_slotname';]);
+
+is($res, 't',
+       "Physical slot reports conflicting as NULL");
+
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+       $node_primary, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$standby_physical_slotname');]);
+
+#######################
+# Initialize cascading standby node
+#######################
+$node_standby->backup($backup_name);
+$node_cascading_standby->init_from_backup(
+       $node_standby, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_cascading_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$standby_physical_slotname']);
+$node_cascading_standby->start;
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, 
s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $result = $node_standby->safe_psql('testdb',
+       qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+       14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+       "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) 
ORDER BY lsn DESC LIMIT 1;"
+);
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby);
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 
'activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+# drop the logical slots
+drop_logical_slots();
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum full on pg_class with hot_standby_feedback turned off on
+# the standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM full pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery"),
+  'inactiveslot slot invalidation is logged with vacuum FULL on pg_class');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery"),
+  'activeslot slot invalidation is logged with vacuum FULL on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1,1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the 
standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is logged with vacuum on pg_class');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is logged with vacuum on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 2 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a non catalog table
+# Scenario 3: No conflict expected.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# put hot standby feedback to off
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should not trigger a conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO conflict_test(x,y) SELECT s, 
s::text FROM generate_series(1,4) s;]);
+$node_primary->safe_psql('testdb', qq[UPDATE conflict_test set x=1, y=1;]);
+$node_primary->safe_psql('testdb', 'VACUUM conflict_test;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should not be issued
+ok( !find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
+
+ok( !find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is not logged with vacuum on conflict_test');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been 
updated
+# we now still expect 2 conflicts reported as the counter persist across 
reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot not updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as non conflicting in pg_replication_slots
+check_slots_conflicting_status(0);
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 0);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 4: conflict due to on-access pruning.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to trigger an on-access pruning
+# on a relation marked as user_catalog_table.
+change_hot_standby_feedback_and_wait_for_xmins(0,0);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s 
char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is logged with on-access pruning');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is logged with on-access pruning');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 3 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 5: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is logged due to wal_level');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is logged due to wal_level');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 3 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 4) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# We are not able to read from the slot as it requires wal_level at least 
logical on the primary server
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires 
wal_level to be at least logical on the primary server");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_replay_catchup($node_standby);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 
'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+is($node_standby->safe_psql('postgres',
+       q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 
'f',
+       'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+       'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_standby->reload;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# create the logical slots on the cascading standby too
+create_logical_slots($node_cascading_standby);
+
+# Make slots actives
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 1, 
\$cascading_stdout, \$cascading_stderr);
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+# Wait for both standbys to catchup
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,7) s;]
+);
+
+# Wait for the cascading standby to catchup
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
promoted standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion
+my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+ok( pump_until(
+        $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($stdout);
+is($stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+# check that we are decoding pre and post promotion inserted rows on the 
cascading standby
+$stdout_sql = $node_cascading_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
cascading standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion on the cascading 
standby
+ok( pump_until(
+        $cascading_handle, $pump_timeout, \$cascading_stdout, 
qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($cascading_stdout);
+is($cascading_stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session on 
cascading standby');
-- 
2.34.1

From 5ab4bb1f376c0a0e7e270f4290668407abf40de9 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 16:46:09 +0000
Subject: [PATCH v57 4/6] Fixing Walsender corner case with logical decoding on
 standby.

The problem is that WalSndWaitForWal() waits for the *replay* LSN to
increase, but gets woken up by walreceiver when new WAL has been
flushed. Which means that typically walsenders will get woken up at the
same time that the startup process will be - which means that by the
time the logical walsender checks GetXLogReplayRecPtr() it's unlikely
that the startup process already replayed the record and updated
XLogCtl->lastReplayedEndRecPtr.

Introducing a new replication_kind variable to the WalSnd struct and moved
the call to WalSndWakeup() in ApplyWalRecord().

The new replication_kind variable helps to filter what kind of walsender
we want to wakeup based on the code path.
---
 src/backend/access/transam/xlog.c           |  6 +++---
 src/backend/access/transam/xlogarchive.c    |  2 +-
 src/backend/access/transam/xlogrecovery.c   | 10 ++++------
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         | 13 +++++++++++--
 src/include/replication/walsender.h         | 20 ++++++++++----------
 src/include/replication/walsender_private.h |  3 +++
 7 files changed, 33 insertions(+), 23 deletions(-)
  32.5% src/backend/access/transam/
  28.5% src/backend/replication/
  38.9% src/include/replication/

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 779f5c3711..70ac8fc33b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
        END_CRIT_SECTION();
 
        /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
        /*
         * If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
        END_CRIT_SECTION();
 
        /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
        /*
         * Great, done. To take some work off the critical path, try to 
initialize
@@ -5773,7 +5773,7 @@ StartupXLOG(void)
         * If there were cascading standby servers connected to us, nudge any 
wal
         * sender processes to notice that we've been promoted.
         */
-       WalSndWakeup();
+       WalSndWakeup(true, true);
 
        /*
         * If this was a promotion, request an (online) checkpoint now. This 
isn't
diff --git a/src/backend/access/transam/xlogarchive.c 
b/src/backend/access/transam/xlogarchive.c
index a0f5aa24b5..d06fdc74c0 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char 
*xlogfname)
         * if we restored something other than a WAL segment, but it does no 
harm
         * either.
         */
-       WalSndWakeup();
+       WalSndWakeup(true, true);
 }
 
 /*
diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..18551cc3b3 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1935,6 +1935,10 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /* Wakeup walsender(s) */
+       WalSndWakeup(switchedTLI && AllowCascadeReplication(),
+                                switchedTLI || RecoveryInProgress());
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and 
sends
@@ -1958,12 +1962,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
                 */
                RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
 
-               /*
-                * Wake up any walsenders to notice that we are on a new 
timeline.
-                */
-               if (AllowCascadeReplication())
-                       WalSndWakeup();
-
                /* Reset the prefetcher. */
                XLogPrefetchReconfigure();
        }
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index 685af51d5d..d2aa93734c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
                /* Signal the startup process and walsender that new WAL has 
arrived */
                WakeupRecovery();
                if (AllowCascadeReplication())
-                       WalSndWakeup();
+                       WalSndWakeup(true, !RecoveryInProgress());
 
                /* Report XLOG streaming progress in PS display */
                if (update_process_title)
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index aeb5f93514..d5d1d5600c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2626,6 +2626,12 @@ InitWalSenderSlot(void)
                        walsnd->sync_standby_priority = 0;
                        walsnd->latch = &MyProc->procLatch;
                        walsnd->replyTime = 0;
+
+                       if (MyDatabaseId == InvalidOid)
+                               walsnd->kind = REPLICATION_KIND_PHYSICAL;
+                       else
+                               walsnd->kind = REPLICATION_KIND_LOGICAL;
+
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        MyWalSnd = (WalSnd *) walsnd;
@@ -3314,13 +3320,14 @@ WalSndShmemInit(void)
  * advisable.
  */
 void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
 {
        int                     i;
 
        for (i = 0; i < max_wal_senders; i++)
        {
                Latch      *latch;
+               ReplicationKind kind;
                WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
                /*
@@ -3329,9 +3336,11 @@ WalSndWakeup(void)
                 */
                SpinLockAcquire(&walsnd->mutex);
                latch = walsnd->latch;
+               kind = walsnd->kind;
                SpinLockRelease(&walsnd->mutex);
 
-               if (latch != NULL)
+               if (latch != NULL && ((physical && kind == 
REPLICATION_KIND_PHYSICAL) ||
+                                                         (logical && kind == 
REPLICATION_KIND_LOGICAL)))
                        SetLatch(latch);
        }
 }
diff --git a/src/include/replication/walsender.h 
b/src/include/replication/walsender.h
index 52bb3e2aae..c6e4515201 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
 extern void WalSndInitStopping(void);
 extern void WalSndWaitStopping(void);
 extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
 /*
  * wakeup walsenders if there is work to be done
  */
-#define WalSndWakeupProcessRequests()          \
-       do                                                                      
        \
-       {                                                                       
        \
-               if (wake_wal_senders)                           \
-               {                                                               
        \
-                       wake_wal_senders = false;               \
-                       if (max_wal_senders > 0)                \
-                               WalSndWakeup();                         \
-               }                                                               
        \
+#define WalSndWakeupProcessRequests(physical, logical)         \
+       do                                                                      
                                        \
+       {                                                                       
                                        \
+               if (wake_wal_senders)                                           
                \
+               {                                                               
                                        \
+                       wake_wal_senders = false;                               
                \
+                       if (max_wal_senders > 0)                                
                \
+                               WalSndWakeup(physical, logical);                
        \
+               }                                                               
                                        \
        } while (0)
 
 #endif                                                 /* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h 
b/src/include/replication/walsender_private.h
index 5310e054c4..ff25aa70a8 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -15,6 +15,7 @@
 #include "access/xlog.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
+#include "nodes/replnodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
         * Timestamp of the last message received from standby.
         */
        TimestampTz replyTime;
+
+       ReplicationKind kind;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
-- 
2.34.1

From fed8b2ff9ce61cee5dc9c06d9e3287b877a29527 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 12:45:20 +0000
Subject: [PATCH v57 3/6] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/replication/logical/decode.c  | 22 ++++++++-
 src/backend/replication/logical/logical.c | 36 +++++++-------
 src/backend/replication/slot.c            | 58 ++++++++++++-----------
 src/backend/replication/walsender.c       | 46 +++++++++++-------
 src/include/access/xlog.h                 |  1 +
 6 files changed, 113 insertions(+), 61 deletions(-)
   4.6% src/backend/access/transam/
  37.7% src/backend/replication/logical/
  56.7% src/backend/replication/

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 10085aa0d6..779f5c3711 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset)
        ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+       return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..b37b91bbe0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
                         * can restart from there.
                         */
                        break;
+               case XLOG_PARAMETER_CHANGE:
+                       {
+                               xl_parameter_change *xlrec =
+                               (xl_parameter_change *) 
XLogRecGetData(buf->record);
+
+                               /*
+                                * If wal_level on primary is reduced to less 
than logical,
+                                * then we want to prevent existing logical 
slots from being
+                                * used. Existing logical slots on standby get 
invalidated
+                                * when this WAL record is replayed; and 
further, slot
+                                * creation fails when the wal level is not 
sufficient; but
+                                * all these operations are not synchronized, 
so a logical
+                                * slot may creep in while the wal_level is 
being reduced.
+                                * Hence this extra check.
+                                */
+                               if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                        errmsg("logical 
decoding on standby requires wal_level to be at least logical on the primary 
server")));
+                               break;
+                       }
                case XLOG_NOOP:
                case XLOG_NEXTOID:
                case XLOG_SWITCH:
                case XLOG_BACKUP_END:
-               case XLOG_PARAMETER_CHANGE:
                case XLOG_RESTORE_POINT:
                case XLOG_FPW_CHANGE:
                case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index c3ec97a0a6..60a5008b6d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("logical decoding requires a database 
connection")));
 
-       /* ----
-        * TODO: We got to change that someday soon...
-        *
-        * There's basically three things missing to allow this:
-        * 1) We need to be able to correctly and quickly identify the timeline 
a
-        *        LSN belongs to
-        * 2) We need to force hot_standby_feedback to be enabled at all times 
so
-        *        the primary cannot remove rows we need.
-        * 3) support dropping replication slots referring to a database, in
-        *        dbase_redo. There can't be any active ones due to HS recovery
-        *        conflicts, so that should be relatively easy.
-        * ----
-        */
        if (RecoveryInProgress())
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("logical decoding cannot be used while 
in recovery")));
+       {
+               /*
+                * This check may have race conditions, but whenever
+                * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, 
we
+                * verify that there are no existing logical replication slots. 
And to
+                * avoid races around creating a new slot,
+                * CheckLogicalDecodingRequirements() is called once before 
creating
+                * the slot, and once when logical decoding is initially 
starting up.
+                */
+               if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("logical decoding on standby 
requires wal_level to be at least logical on the primary server")));
+       }
 }
 
 /*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
        LogicalDecodingContext *ctx;
        MemoryContext old_context;
 
+       /*
+        * On standby, this check is also required while creating the slot. 
Check
+        * the comments in this function.
+        */
+       CheckLogicalDecodingRequirements();
+
        /* shorter lines... */
        slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 265ed0f84c..0091793658 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
 
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -1183,37 +1184,28 @@ ReplicationSlotReserveWal(void)
                /*
                 * For logical slots log a standby snapshot and start logical 
decoding
                 * at exactly that position. That allows the slot to start up 
more
-                * quickly.
+                * quickly. But on a standby we cannot do WAL writes, so just 
use the
+                * replay pointer; effectively, an attempt to create a logical 
slot on
+                * standby will cause it to wait for an xl_running_xact record 
to be
+                * logged independently on the primary, so that a snapshot can 
be
+                * built using the record.
                 *
-                * That's not needed (or indeed helpful) for physical slots as 
they'll
-                * start replay at the last logged checkpoint anyway. Instead 
return
-                * the location of the last redo LSN. While that slightly 
increases
-                * the chance that we have to retry, it's where a base backup 
has to
-                * start replay at.
+                * None of this is needed (or indeed helpful) for physical 
slots as
+                * they'll start replay at the last logged checkpoint anyway. 
Instead
+                * return the location of the last redo LSN. While that slightly
+                * increases the chance that we have to retry, it's where a base
+                * backup has to start replay at.
                 */
-               if (!RecoveryInProgress() && SlotIsLogical(slot))
-               {
-                       XLogRecPtr      flushptr;
-
-                       /* start at current insert position */
+               if (SlotIsPhysical(slot))
+                       restart_lsn = GetRedoRecPtr();
+               else if (RecoveryInProgress())
+                       restart_lsn = GetXLogReplayRecPtr(NULL);
+               else
                        restart_lsn = GetXLogInsertRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-
-                       /* make sure we have enough information to start */
-                       flushptr = LogStandbySnapshot();
 
-                       /* and make sure it's fsynced to disk */
-                       XLogFlush(flushptr);
-               }
-               else
-               {
-                       restart_lsn = GetRedoRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-               }
+               SpinLockAcquire(&slot->mutex);
+               slot->data.restart_lsn = restart_lsn;
+               SpinLockRelease(&slot->mutex);
 
                /* prevent WAL removal as fast as possible */
                ReplicationSlotsComputeRequiredLSN();
@@ -1229,8 +1221,18 @@ ReplicationSlotReserveWal(void)
                if (XLogGetLastRemovedSegno() < segno)
                        break;
        }
-}
 
+       if (!RecoveryInProgress() && SlotIsLogical(slot))
+       {
+               XLogRecPtr      flushptr;
+
+               /* make sure we have enough information to start */
+               flushptr = LogStandbySnapshot();
+
+               /* and make sure it's fsynced to disk */
+               XLogFlush(flushptr);
+       }
+}
 
 /*
  * Report terminating or conflicting message.
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index b686691ca2..aeb5f93514 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,32 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        int                     count;
        WALReadError errinfo;
        XLogSegNo       segno;
-       TimeLineID      currTLI = GetWALInsertionTimeLine();
+       TimeLineID      currTLI;
 
        /*
-        * Since logical decoding is only permitted on a primary server, we know
-        * that the current timeline ID can't be changing any more. If we did 
this
-        * on a standby, we'd have to worry about the values we compute here
-        * becoming invalid due to a promotion or timeline change.
+        * Since logical decoding is also permitted on a standby server, we need
+        * to check if the server is in recovery to decide how to get the 
current
+        * timeline ID (so that it also cover the promotion or timeline change
+        * cases).
         */
+
+       /* make sure we have enough WAL available */
+       flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+       /* the standby could have been promoted, so check if still in recovery 
*/
+       am_cascading_walsender = RecoveryInProgress();
+
+       if (am_cascading_walsender)
+               GetXLogReplayRecPtr(&currTLI);
+       else
+               currTLI = GetWALInsertionTimeLine();
+
        XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
        sendTimeLineIsHistoric = (state->currTLI != currTLI);
        sendTimeLine = state->currTLI;
        sendTimeLineValidUpto = state->currTLIValidUntil;
        sendTimeLineNextTLI = state->nextTLI;
 
-       /* make sure we have enough WAL available */
-       flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
        /* fail if not (implies we are going to shut down) */
        if (flushptr < targetPagePtr + reqLen)
                return -1;
@@ -937,9 +946,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
                                 cur_page,
                                 targetPagePtr,
                                 XLOG_BLCKSZ,
-                                state->seg.ws_tli, /* Pass the current TLI 
because only
-                                                                        * 
WalSndSegmentOpen controls whether new
-                                                                        * TLI 
is needed. */
+                                currTLI,               /* Pass the current TLI 
because only
+                                                                * 
WalSndSegmentOpen controls whether new TLI
+                                                                * is needed. */
                                 &errinfo))
                WALReadRaiseError(&errinfo);
 
@@ -3073,10 +3082,14 @@ XLogSendLogical(void)
         * If first time through in this session, initialize flushPtr.  
Otherwise,
         * we only need to update flushPtr if EndRecPtr is past it.
         */
-       if (flushPtr == InvalidXLogRecPtr)
-               flushPtr = GetFlushRecPtr(NULL);
-       else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-               flushPtr = GetFlushRecPtr(NULL);
+       if (flushPtr == InvalidXLogRecPtr ||
+               logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+       {
+               if (am_cascading_walsender)
+                       flushPtr = GetStandbyFlushRecPtr(NULL);
+               else
+                       flushPtr = GetFlushRecPtr(NULL);
+       }
 
        /* If EndRecPtr is still past our flushPtr, it means we caught up. */
        if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3167,7 +3180,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
        receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-       *tli = replayTLI;
+       if (tli)
+               *tli = replayTLI;
 
        result = replayPtr;
        if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738..48ca852381 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
-- 
2.34.1

From 296d282f663f0bc6ba97d0896b0e95d7ff2d5994 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 11:28:30 +0000
Subject: [PATCH v57 2/6] Arrange for a new pg_stat_database_conflicts and
 pg_replication_slots field

As we handled logical slot conflicts on standby on the previous commit, we
can expose the conflict in pg_stat_database_conflicts and pg_replication_slots.

Adding:

- confl_active_logicalslot in pg_stat_database_conflicts
- conflicting in pg_replication_slots

to do so.
---
 doc/src/sgml/monitoring.sgml                 | 11 +++++++++++
 doc/src/sgml/system-views.sgml               | 10 ++++++++++
 src/backend/catalog/system_views.sql         |  6 ++++--
 src/backend/replication/slotfuncs.c          | 13 ++++++++++++-
 src/backend/utils/activity/pgstat_database.c |  4 ++++
 src/backend/utils/adt/pgstatfuncs.c          |  3 +++
 src/include/catalog/pg_proc.dat              | 11 ++++++++---
 src/include/pgstat.h                         |  1 +
 src/test/regress/expected/rules.out          |  8 +++++---
 9 files changed, 58 insertions(+), 9 deletions(-)
  32.6% doc/src/sgml/
   7.9% src/backend/catalog/
  15.7% src/backend/replication/
   5.8% src/backend/utils/activity/
   5.5% src/backend/utils/adt/
  23.9% src/include/catalog/
   6.7% src/test/regress/expected/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index d5a45f996d..87fd10401d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4674,6 +4674,17 @@ SELECT pid, wait_event_type, wait_event FROM 
pg_stat_activity WHERE wait_event i
        deadlocks
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of active logical slots in this database that have been
+       invalidated because they conflict with recovery (note that inactive ones
+       are also invalidated but do not increment this counter)
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index bb1a418450..57b228076e 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        false for physical slots.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>conflicting</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if this logical slot conflicted with recovery (and so is now
+       invalidated). Always NULL for physical slots.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 574cbc2e44..3a8088ac03 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -999,7 +999,8 @@ CREATE VIEW pg_replication_slots AS
             L.confirmed_flush_lsn,
             L.wal_status,
             L.safe_wal_size,
-            L.two_phase
+            L.two_phase,
+            L.conflicting
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
@@ -1067,7 +1068,8 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
-            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS 
confl_active_logicalslot
     FROM pg_database D;
 
 CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 015d276fd9..1f1f076558 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
        ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        XLogRecPtr      currlsn;
        int                     slotno;
@@ -403,6 +403,17 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
                values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+               if (slot_contents.data.database == InvalidOid)
+                       nulls[i++] = true;
+               else
+               {
+                       if (slot_contents.data.xmin == InvalidTransactionId &&
+                               slot_contents.data.catalog_xmin == 
InvalidTransactionId)
+                               values[i++] = BoolGetDatum(true);
+                       else
+                               values[i++] = BoolGetDatum(false);
+               }
+
                Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
                tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/backend/utils/activity/pgstat_database.c 
b/src/backend/utils/activity/pgstat_database.c
index 6e650ceaad..7149f22f72 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
                case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
                        dbentry->conflict_bufferpin++;
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       dbentry->conflict_logicalslot++;
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        dbentry->conflict_startup_deadlock++;
                        break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool 
nowait)
        PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
        PGSTAT_ACCUM_DBCOUNT(conflict_lock);
        PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+       PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
        PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
        PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c 
b/src/backend/utils/adt/pgstatfuncs.c
index eec9f3cf9b..4de60d8aa1 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1066,6 +1066,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
 /* pg_stat_get_db_xact_rollback */
 PG_STAT_GET_DBENTRY_INT64(xact_rollback)
 
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
 
 Datum
 pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1099,6 +1101,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
                result = (int64) (dbentry->conflict_tablespace +
                                                  dbentry->conflict_lock +
                                                  dbentry->conflict_snapshot +
+                                                 dbentry->conflict_logicalslot 
+
                                                  dbentry->conflict_bufferpin +
                                                  
dbentry->conflict_startup_deadlock);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f9f2642201..bcbae9036d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5605,6 +5605,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+  descr => 'statistics: recovery conflicts in database caused by logical 
replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer 
pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
@@ -11071,9 +11076,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => 
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => 
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+  proallargtypes => 
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => 
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 75d258d921..fa3d326d86 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -331,6 +331,7 @@ typedef struct PgStat_StatDBEntry
        PgStat_Counter conflict_tablespace;
        PgStat_Counter conflict_lock;
        PgStat_Counter conflict_snapshot;
+       PgStat_Counter conflict_logicalslot;
        PgStat_Counter conflict_bufferpin;
        PgStat_Counter conflict_startup_deadlock;
        PgStat_Counter temp_files;
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index ab1aebfde4..06d3f1f5d3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.confirmed_flush_lsn,
     l.wal_status,
     l.safe_wal_size,
-    l.two_phase
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, 
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, 
confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+    l.two_phase,
+    l.conflicting
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, 
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, 
confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
@@ -1869,7 +1870,8 @@ pg_stat_database_conflicts| SELECT oid AS datid,
     pg_stat_get_db_conflict_lock(oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot,
     pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin,
-    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock
+    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock,
+    pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot
    FROM pg_database d;
 pg_stat_gssapi| SELECT pid,
     gss_auth AS gss_authenticated,
-- 
2.34.1

From 90675b0797a698655fca84c1eb061de0aa7bc996 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 08:57:56 +0000
Subject: [PATCH v57 1/6] Handle logical slot conflicts on standby.

During WAL replay on standby, when slot conflict is identified,
invalidate such slots. Also do the same thing if wal_level on the primary server
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello,
Bharath Rupireddy
---
 src/backend/access/gist/gistxlog.c            |   2 +
 src/backend/access/hash/hash_xlog.c           |   1 +
 src/backend/access/heap/heapam.c              |   3 +
 src/backend/access/nbtree/nbtxlog.c           |   2 +
 src/backend/access/spgist/spgxlog.c           |   1 +
 src/backend/access/transam/xlog.c             |  21 +-
 .../replication/logical/logicalfuncs.c        |  13 +-
 src/backend/replication/slot.c                | 190 ++++++++++++++----
 src/backend/replication/slotfuncs.c           |   3 +-
 src/backend/replication/walsender.c           |   7 +
 src/backend/storage/ipc/procsignal.c          |   3 +
 src/backend/storage/ipc/standby.c             |  13 +-
 src/backend/tcop/postgres.c                   |  28 +++
 src/include/replication/slot.h                |  57 +++++-
 src/include/storage/procsignal.h              |   1 +
 src/include/storage/standby.h                 |   2 +
 16 files changed, 296 insertions(+), 51 deletions(-)
   7.1% src/backend/access/transam/
   5.3% src/backend/replication/logical/
  58.6% src/backend/replication/
   5.0% src/backend/storage/ipc/
   7.8% src/backend/tcop/
  12.6% src/include/replication/
   3.3% src/

diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index b7678f3c14..9a86fb3fef 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        rlocator);
        }
 
@@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->isCatalogRel,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/hash/hash_xlog.c 
b/src/backend/access/hash/hash_xlog.c
index f2dd9be8d3..e8e06c62a9 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f7d9ce59a4..371e855683 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8717,6 +8717,7 @@ heap_xlog_prune(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
 
        /*
@@ -8888,6 +8889,7 @@ heap_xlog_visible(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
                                                                                
        rlocator);
 
        /*
@@ -9009,6 +9011,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/nbtree/nbtxlog.c 
b/src/backend/access/nbtree/nbtxlog.c
index 414ca4f6de..c87e46ed66 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
        }
 
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->isCatalogRel,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/spgist/spgxlog.c 
b/src/backend/access/spgist/spgxlog.c
index b071b59c8a..459ac929ba 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        locator);
        }
 
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 46821ad605..10085aa0d6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,7 @@ CreateCheckPoint(int flags)
         */
        XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
        KeepLogSeg(recptr, &_logSegNo);
-       if (InvalidateObsoleteReplicationSlots(_logSegNo))
+       if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, NULL))
        {
                /*
                 * Some slots have been invalidated; recalculate the old-segment
@@ -7250,7 +7250,7 @@ CreateRestartPoint(int flags)
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
        endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
        KeepLogSeg(endptr, &_logSegNo);
-       if (InvalidateObsoleteReplicationSlots(_logSegNo))
+       if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, NULL))
        {
                /*
                 * Some slots have been invalidated; recalculate the old-segment
@@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record)
                /* Update our copy of the parameters in pg_control */
                memcpy(&xlrec, XLogRecGetData(record), 
sizeof(xl_parameter_change));
 
+               /*
+                * Invalidate logical slots if we are in hot standby and the 
primary
+                * does not have a WAL level sufficient for logical decoding. 
No need
+                * to search for potentially conflicting logically slots if 
standby is
+                * running with wal_level lower than logical, because in that 
case, we
+                * would have either disallowed creation of logical slots or
+                * invalidated existing ones.
+                */
+               if (InRecovery && InHotStandby &&
+                       xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+                       wal_level >= WAL_LEVEL_LOGICAL)
+               {
+                       TransactionId ConflictHorizon = InvalidTransactionId;
+
+                       InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, 
InvalidOid, &ConflictHorizon);
+               }
+
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->MaxConnections = xlrec.MaxConnections;
                ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/replication/logical/logicalfuncs.c 
b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..070fd378e8 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -216,9 +216,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
 
                /*
                 * After the sanity checks in CreateDecodingContext, make sure 
the
-                * restart_lsn is valid.  Avoid "cannot get changes" wording in 
this
-                * errmsg because that'd be confusingly ambiguous about no 
changes
-                * being available.
+                * restart_lsn is valid or both xmin and catalog_xmin are 
valid. Avoid
+                * "cannot get changes" wording in this errmsg because that'd be
+                * confusingly ambiguous about no changes being available.
                 */
                if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                        ereport(ERROR,
@@ -227,6 +227,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
                                                        NameStr(*name)),
                                         errdetail("This slot has never 
previously reserved WAL, or it has been invalidated.")));
 
+               if (LogicalReplicationSlotIsInvalid(MyReplicationSlot))
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("cannot read from logical 
replication slot \"%s\"",
+                                                       NameStr(*name)),
+                                        errdetail("This slot has been 
invalidated because it was conflicting with recovery.")));
+
                MemoryContextSwitchTo(oldcontext);
 
                /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2293c0c6fc..265ed0f84c 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -110,6 +110,13 @@ static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
 
+/* to report termination/invalidation */
+static void ReportTerminationInvalidation(bool terminating, bool islogical,
+                                                                               
  int pid, NameData slotname,
+                                                                               
  TransactionId *xid,
+                                                                               
  XLogRecPtr restart_lsn,
+                                                                               
  XLogRecPtr oldestLSN);
+
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
  */
@@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
                SpinLockAcquire(&s->mutex);
                effective_xmin = s->effective_xmin;
                effective_catalog_xmin = s->effective_catalog_xmin;
-               invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-                                          
XLogRecPtrIsInvalid(s->data.restart_lsn));
+               invalidated = SlotIsInvalid(s, true) || 
LogicalReplicationSlotIsInvalid(s);
                SpinLockRelease(&s->mutex);
 
                /* invalidated slots need not apply */
@@ -1225,28 +1231,91 @@ ReplicationSlotReserveWal(void)
        }
 }
 
+
+/*
+ * Report terminating or conflicting message.
+ *
+ * For both, logical conflict on standby and obsolete slot are handled.
+ */
+static void
+ReportTerminationInvalidation(bool terminating, bool islogical, int pid,
+                                                         NameData slotname, 
TransactionId *xid,
+                                                         XLogRecPtr 
restart_lsn, XLogRecPtr oldestLSN)
+{
+       StringInfoData err_msg;
+       StringInfoData err_detail;
+       bool            hint = false;
+
+       initStringInfo(&err_msg);
+       initStringInfo(&err_detail);
+
+       if (terminating)
+               appendStringInfo(&err_msg, _("terminating process %d to release 
replication slot \"%s\""),
+                                                pid,
+                                                NameStr(slotname));
+       else
+               appendStringInfo(&err_msg, _("invalidating"));
+
+       if (islogical)
+       {
+               if (terminating)
+                       appendStringInfo(&err_msg, _(" because it conflicts 
with recovery"));
+
+               if (TransactionIdIsValid(*xid))
+                       appendStringInfo(&err_detail, _("The slot conflicted 
with xid horizon %u."), *xid);
+               else
+                       appendStringInfo(&err_detail, _("Logical decoding on 
standby requires wal_level to be at least logical on the primary server"));
+       }
+       else
+       {
+               if (!terminating)
+                       appendStringInfo(&err_msg, _(" obsolete replication"));
+
+               appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X 
exceeds the limit by %llu bytes."),
+                                                LSN_FORMAT_ARGS(restart_lsn),
+                                                (unsigned long long) 
(oldestLSN - restart_lsn));
+
+               hint = true;
+       }
+
+       if (!terminating)
+               appendStringInfo(&err_msg, _(" slot \"%s\" because it conflicts 
with recovery"),
+                                                NameStr(slotname));
+
+       ereport(LOG,
+                       errmsg("%s", err_msg.data),
+                       errdetail("%s", err_detail.data),
+                       hint ? errhint("You might need to increase 
max_slot_wal_keep_size.") : 0);
+
+       pfree(err_msg.data);
+       pfree(err_detail.data);
+}
+
 /*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
  *
- * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
+ * Sets *invalidated true if an obsolete slot was invalidated. (Untouched 
otherwise.)
  *
  * This is inherently racy, because we release the LWLock
  * for syscalls, so caller must restart if we return true.
  */
 static bool
 InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
-                                                          bool *invalidated)
+                                                          bool *invalidated, 
TransactionId *xid)
 {
        int                     last_signaled_pid = 0;
        bool            released_lock = false;
+       bool            islogical;
 
        for (;;)
        {
                XLogRecPtr      restart_lsn;
+
                NameData        slotname;
                int                     active_pid = 0;
 
@@ -1263,19 +1332,22 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                 * Check if the slot needs to be invalidated. If it needs to be
                 * invalidated, and is not currently acquired, acquire it and 
mark it
                 * as having been invalidated.  We do this with the spinlock 
held to
-                * avoid race conditions -- for example the restart_lsn could 
move
-                * forward, or the slot could be dropped.
+                * avoid race conditions -- for example the restart_lsn (or the
+                * xmin(s) could) move forward or the slot could be dropped.
                 */
                SpinLockAcquire(&s->mutex);
 
                restart_lsn = s->data.restart_lsn;
 
                /*
-                * If the slot is already invalid or is fresh enough, we don't 
need to
-                * do anything.
+                * If the slot is already invalid or is a non conflicting slot, 
we
+                * don't need to do anything.
                 */
-               if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= 
oldestLSN)
+               islogical = xid ? true : false;
+
+               if (SlotIsInvalid(s, islogical) || SlotIsNotConflicting(s, 
islogical, xid, &oldestLSN))
                {
+                       /* then, we are not forcing for invalidation */
                        SpinLockRelease(&s->mutex);
                        if (released_lock)
                                LWLockRelease(ReplicationSlotControlLock);
@@ -1294,9 +1366,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                {
                        MyReplicationSlot = s;
                        s->active_pid = MyProcPid;
-                       s->data.invalidated_at = restart_lsn;
-                       s->data.restart_lsn = InvalidXLogRecPtr;
-
+                       if (xid)
+                       {
+                               s->data.xmin = InvalidTransactionId;
+                               s->data.catalog_xmin = InvalidTransactionId;
+                       }
+                       else
+                       {
+                               s->data.invalidated_at = restart_lsn;
+                               s->data.restart_lsn = InvalidXLogRecPtr;
+                       }
                        /* Let caller know */
                        *invalidated = true;
                }
@@ -1329,15 +1408,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                         */
                        if (last_signaled_pid != active_pid)
                        {
-                               ereport(LOG,
-                                               errmsg("terminating process %d 
to release replication slot \"%s\"",
-                                                          active_pid, 
NameStr(slotname)),
-                                               errdetail("The slot's 
restart_lsn %X/%X exceeds the limit by %llu bytes.",
-                                                                 
LSN_FORMAT_ARGS(restart_lsn),
-                                                                 (unsigned 
long long) (oldestLSN - restart_lsn)),
-                                               errhint("You might need to 
increase max_slot_wal_keep_size."));
-
-                               (void) kill(active_pid, SIGTERM);
+                               ReportTerminationInvalidation(true, islogical, 
active_pid,
+                                                                               
          slotname, xid, restart_lsn,
+                                                                               
          oldestLSN);
+
+                               if (islogical)
+                                       (void) SendProcSignal(active_pid, 
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
+                               else
+                                       (void) kill(active_pid, SIGTERM);
+
                                last_signaled_pid = active_pid;
                        }
 
@@ -1370,14 +1449,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                        ReplicationSlotMarkDirty();
                        ReplicationSlotSave();
                        ReplicationSlotRelease();
+                       pgstat_drop_replslot(s);
 
-                       ereport(LOG,
-                                       errmsg("invalidating obsolete 
replication slot \"%s\"",
-                                                  NameStr(slotname)),
-                                       errdetail("The slot's restart_lsn %X/%X 
exceeds the limit by %llu bytes.",
-                                                         
LSN_FORMAT_ARGS(restart_lsn),
-                                                         (unsigned long long) 
(oldestLSN - restart_lsn)),
-                                       errhint("You might need to increase 
max_slot_wal_keep_size."));
+                       ReportTerminationInvalidation(false, islogical, 
active_pid,
+                                                                               
  slotname, xid, restart_lsn,
+                                                                               
  oldestLSN);
 
                        /* done with this slot for now */
                        break;
@@ -1390,20 +1466,36 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate Obsolete slots or resolve recovery conflicts with logical slots.
+ *
+ * Obsolete case (aka xid is NULL):
  *
- * Returns true when any slot have got invalidated.
+ *      Mark any slot that points to an LSN older than the given segment
+ *      as invalid; it requires WAL that's about to be removed.
+ *      invalidated is set to true when any slot have got invalidated.
  *
- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ *  Logical replication slot case:
+ *
+ *      When xid is valid, it means that we are about to remove rows older 
than xid.
+ *      Therefore we need to invalidate slots that depend on seeing those rows.
+ *      When xid is invalid, invalidate all logical slots. This is required 
when the
+ *      master wal_level is set back to replica, so existing logical slots 
need to
+ *      be invalidated.
  */
 bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid, 
TransactionId *xid)
 {
-       XLogRecPtr      oldestLSN;
+
+       XLogRecPtr      oldestLSN = InvalidXLogRecPtr;
        bool            invalidated = false;
 
-       XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+       Assert(max_replication_slots >= 0);
+
+       if (max_replication_slots == 0)
+               return invalidated;
+
+       if (!xid)
+               XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, 
oldestLSN);
 
 restart:
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1414,21 +1506,35 @@ restart:
                if (!s->in_use)
                        continue;
 
-               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+               if (xid)
                {
-                       /* if the lock was released, start from scratch */
-                       goto restart;
+                       /* we are only dealing with *logical* slot conflicts */
+                       if (!SlotIsLogical(s))
+                               continue;
+
+                       /*
+                        * not the database of interest and we don't want all 
the
+                        * database, skip
+                        */
+                       if (s->data.database != dboid && 
TransactionIdIsValid(*xid))
+                               continue;
                }
+
+               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated, 
xid))
+                       goto restart;
        }
+
        LWLockRelease(ReplicationSlotControlLock);
 
        /*
-        * If any slots have been invalidated, recalculate the resource limits.
+        * If any slots have been invalidated, recalculate the required xmin and
+        * the required lsn (if appropriate).
         */
        if (invalidated)
        {
                ReplicationSlotsComputeRequiredXmin(false);
-               ReplicationSlotsComputeRequiredLSN();
+               if (!xid)
+                       ReplicationSlotsComputeRequiredLSN();
        }
 
        return invalidated;
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 2f3c964824..015d276fd9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -319,8 +319,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                 * certain that the slot has been invalidated.  Otherwise, test
                 * availability from restart_lsn.
                 */
-               if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
-                       !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+               if (ObsoleteSlotIsInvalid(slot, true))
                        walstate = WALAVAIL_REMOVED;
                else
                        walstate = 
GetWALAvailability(slot_contents.data.restart_lsn);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 75e8363e24..b686691ca2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,6 +1253,13 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        ReplicationSlotAcquire(cmd->slotname, true);
 
+       if (LogicalReplicationSlotIsInvalid(MyReplicationSlot))
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("cannot read from logical replication 
slot \"%s\"",
+                                               cmd->slotname),
+                                errdetail("This slot has been invalidated 
because it was conflicting with recovery.")));
+
        if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                ereport(ERROR,
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/storage/ipc/procsignal.c 
b/src/backend/storage/ipc/procsignal.c
index 395b2cf690..c85cb5cc18 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
                RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+       if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+               
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
                
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c 
b/src/backend/storage/ipc/standby.c
index 9f56b4e95c..c62245afc7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId 
*waitlist,
  */
 void
 ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+                                                                       bool 
isCatalogRel,
                                                                        
RelFileLocator locator)
 {
        VirtualTransactionId *backends;
@@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
                                                                                
   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   true);
+
+       if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+               InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, 
locator.dbOid, &snapshotConflictHorizon);
 }
 
 /*
@@ -499,6 +504,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
  */
 void
 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
   bool isCatalogRel,
                                                                                
   RelFileLocator locator)
 {
        /*
@@ -517,7 +523,9 @@ 
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
                TransactionId truncated;
 
                truncated = XidFromFullTransactionId(snapshotConflictHorizon);
-               ResolveRecoveryConflictWithSnapshot(truncated, locator);
+               ResolveRecoveryConflictWithSnapshot(truncated,
+                                                                               
        isCatalogRel,
+                                                                               
        locator);
        }
 }
 
@@ -1478,6 +1486,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        reasonDesc = _("recovery conflict on snapshot");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       reasonDesc = _("recovery conflict on replication slot");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        reasonDesc = _("recovery conflict on buffer deadlock");
                        break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50..fb2de8542d 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        errdetail("User query might have needed to see row 
versions that must be removed.");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       errdetail("User was using the logical slot that must be 
dropped.");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        errdetail("User transaction caused buffer deadlock with 
recovery.");
                        break;
@@ -3137,6 +3140,31 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
                                /* Intentional fall through to session cancel */
                                /* FALLTHROUGH */
 
+                       case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+
+                               /*
+                                * For conflicts that require a logical slot to 
be
+                                * invalidated, the requirement is for the 
signal receiver to
+                                * release the slot, so that it could be 
invalidated by the
+                                * signal sender. So for normal backends, the 
transaction
+                                * should be aborted, just like for other 
recovery conflicts.
+                                * But if it's walsender on standby, we don't 
want to go
+                                * through the following 
IsTransactionOrTransactionBlock()
+                                * check, so break here.
+                                */
+                               if (am_cascading_walsender &&
+                                       reason == 
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+                                       MyReplicationSlot && 
SlotIsLogical(MyReplicationSlot))
+                               {
+                                       RecoveryConflictPending = true;
+                                       QueryCancelPending = true;
+                                       InterruptPending = true;
+                                       break;
+                               }
+
+                               /* Intentional fall through to session cancel */
+                               /* FALLTHROUGH */
+
                        case PROCSIG_RECOVERY_CONFLICT_DATABASE:
                                RecoveryConflictPending = true;
                                ProcDiePending = true;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8872c80cdf..782bf658c3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -168,6 +168,60 @@ typedef struct ReplicationSlot
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
 #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
 
+static inline bool
+ObsoleteSlotIsInvalid(ReplicationSlot *s, bool check_invalidated_at)
+{
+       if (check_invalidated_at)
+               return (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
+                               XLogRecPtrIsInvalid(s->data.restart_lsn));
+       else
+               return (XLogRecPtrIsInvalid(s->data.restart_lsn));
+}
+
+static inline bool
+LogicalReplicationSlotIsInvalid(ReplicationSlot *s)
+{
+       return (!TransactionIdIsValid(s->data.xmin) &&
+                       !TransactionIdIsValid(s->data.catalog_xmin));
+}
+
+static inline bool
+SlotIsInvalid(ReplicationSlot *s, bool islogical)
+{
+       if (islogical)
+               return LogicalReplicationSlotIsInvalid(s);
+       else
+               return ObsoleteSlotIsInvalid(s, false);
+}
+
+static inline bool
+LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
+{
+       TransactionId slot_xmin;
+       TransactionId slot_catalog_xmin;
+
+       slot_xmin = s->data.xmin;
+       slot_catalog_xmin = s->data.catalog_xmin;
+
+       return (((TransactionIdIsValid(slot_xmin) && 
TransactionIdPrecedesOrEquals(slot_xmin, xid)) ||
+                        (TransactionIdIsValid(slot_catalog_xmin) && 
TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))));
+}
+
+static inline bool
+SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+       return (s->data.restart_lsn >= oldestLSN);
+}
+
+static inline bool
+SlotIsNotConflicting(ReplicationSlot *s, bool islogical, TransactionId *xid, 
XLogRecPtr *oldestLSN)
+{
+       if (islogical)
+               return (TransactionIdIsValid(*xid) && 
!LogicalReplicationSlotXidsConflict(s, *xid));
+       else
+               return SlotIsFreshEnough(s, *oldestLSN);
+}
+
 /*
  * Shared memory control area for all of replication slots.
  */
@@ -215,7 +269,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid 
dboid, TransactionId *xid);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool 
need_lock);
 extern int     ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
@@ -227,5 +281,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId 
xid, char *reason);
 
 #endif                                                 /* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 905af2231b..2f52100b00 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -42,6 +42,7 @@ typedef enum
        PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
        PROCSIG_RECOVERY_CONFLICT_LOCK,
        PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+       PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
        PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
        PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2effdea126..41f4dc372e 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
+                                                                               
                bool isCatalogRel,
                                                                                
                RelFileLocator locator);
 extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
                           bool isCatalogRel,
                                                                                
                           RelFileLocator locator);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
-- 
2.34.1

Reply via email to