Hi,
On 4/4/23 7:53 PM, Andres Freund wrote:
Hi,
On 2023-04-04 18:54:33 +0200, Drouvot, Bertrand wrote:
if (check_on_xid)
{
if (terminating)
appendStringInfo(&err_msg, _("terminating process %d to release replication slot
\"%s\" because it conflicts with recovery"),
pid,
NameStr(slotname));
FWIW, I would just use exactly the same error message as today here.
errmsg("terminating process %d to release
replication slot \"%s\"",
active_pid,
NameStr(slotname)),
This is accurate for both the existing and the new case. Then there's no need
to put that string into a stringinfo either.
Right, thanks! Did it that way in V60 attached.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 4861568b10fe2187d38f2997ad916a984918aa5b Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 14:08:11 +0000
Subject: [PATCH v60 6/6] Doc changes describing details about logical
decoding.
Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes
Mello
---
doc/src/sgml/logicaldecoding.sgml | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
100.0% doc/src/sgml/
diff --git a/doc/src/sgml/logicaldecoding.sgml
b/doc/src/sgml/logicaldecoding.sgml
index 4e912b4bd4..3da254ed1f 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,28 @@ postgres=# select * from
pg_logical_slot_get_changes('regression_slot', NULL, NU
may consume changes from a slot at any given time.
</para>
+ <para>
+ A logical replication slot can also be created on a hot standby. To
prevent
+ <command>VACUUM</command> from removing required rows from the system
+ catalogs, <varname>hot_standby_feedback</varname> should be set on the
+ standby. In spite of that, if any required rows get removed, the slot gets
+ invalidated. It's highly recommended to use a physical slot between the
primary
+ and the standby. Otherwise, hot_standby_feedback will work, but only
while the
+ connection is alive (for example a node restart would break it). Existing
+ logical slots on standby also get invalidated if wal_level on primary is
reduced to
+ less than 'logical'.
+ </para>
+
+ <para>
+ For a logical slot to be created, it builds a historic snapshot, for which
+ information of all the currently running transactions is essential. On
+ primary, this information is available, but on standby, this information
+ has to be obtained from primary. So, slot creation may wait for some
+ activity to happen on the primary. If the primary is idle, creating a
+ logical slot on standby may take a noticeable time. One option to speed it
+ is to call the <function>pg_log_standby_snapshot</function> on the
primary.
+ </para>
+
<caution>
<para>
Replication slots persist across crashes and know nothing about the state
--
2.34.1
From 3dd93d8dbc601accedceae199e539ba74252e092 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 09:04:12 +0000
Subject: [PATCH v60 5/6] New TAP test for logical decoding on standby.
In addition to the new TAP test, this commit introduces a new
pg_log_standby_snapshot()
function.
The idea is to be able to take a snapshot of running transactions and write this
to WAL without requesting for a (costly) checkpoint.
Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes
Mello
---
doc/src/sgml/func.sgml | 15 +
src/backend/access/transam/xlogfuncs.c | 32 +
src/backend/catalog/system_functions.sql | 2 +
src/include/catalog/pg_proc.dat | 3 +
src/test/perl/PostgreSQL/Test/Cluster.pm | 37 +
src/test/recovery/meson.build | 1 +
.../t/035_standby_logical_decoding.pl | 705 ++++++++++++++++++
7 files changed, 795 insertions(+)
3.1% src/backend/
4.0% src/test/perl/PostgreSQL/Test/
89.7% src/test/recovery/t/
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 918a492234..939fb8019f 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27034,6 +27034,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number *
ps.setting::int + :offset
prepared with <xref linkend="sql-prepare-transaction"/>.
</para></entry>
</row>
+ <row>
+ <entry role="func_table_entry"><para role="func_signature">
+ <indexterm>
+ <primary>pg_log_standby_snapshot</primary>
+ </indexterm>
+ <function>pg_log_standby_snapshot</function> ()
+ <returnvalue>pg_lsn</returnvalue>
+ </para>
+ <para>
+ Take a snapshot of running transactions and write this to WAL without
+ having to wait bgwriter or checkpointer to log one. This one is useful
for
+ logical decoding on standby for which logical slot creation is hanging
+ until such a record is replayed on the standby.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/src/backend/access/transam/xlogfuncs.c
b/src/backend/access/transam/xlogfuncs.c
index c07daa874f..481e9a47da 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -38,6 +38,7 @@
#include "utils/pg_lsn.h"
#include "utils/timestamp.h"
#include "utils/tuplestore.h"
+#include "storage/standby.h"
/*
* Backup-related variables.
@@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS)
PG_RETURN_LSN(switchpoint);
}
+/*
+ * pg_log_standby_snapshot: call LogStandbySnapshot()
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ */
+Datum
+pg_log_standby_snapshot(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr recptr;
+
+ if (RecoveryInProgress())
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is in progress"),
+ errhint("pg_log_standby_snapshot() cannot be
executed during recovery.")));
+
+ if (!XLogStandbyInfoActive())
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("wal_level is not in desired state"),
+ errhint("wal_level has to be >=
WAL_LEVEL_REPLICA.")));
+
+ recptr = LogStandbySnapshot();
+
+ /*
+ * As a convenience, return the WAL location of the last inserted record
+ */
+ PG_RETURN_LSN(recptr);
+}
+
/*
* pg_create_restore_point: a named point for restore
*
diff --git a/src/backend/catalog/system_functions.sql
b/src/backend/catalog/system_functions.sql
index 83ca893444..b7c65ea37d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text)
FROM public;
REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
+REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
+
REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bcbae9036d..284138727e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6426,6 +6426,9 @@
{ oid => '2848', descr => 'switch to new wal file',
proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn',
proargtypes => '', prosrc => 'pg_switch_wal' },
+{ oid => '9658', descr => 'log details of the current snapshot to WAL',
+ proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype =>
'pg_lsn',
+ proargtypes => '', prosrc => 'pg_log_standby_snapshot' },
{ oid => '3098', descr => 'create a named restore point',
proname => 'pg_create_restore_point', provolatile => 'v',
prorettype => 'pg_lsn', proargtypes => 'text',
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index a3aef8b5e9..62376de602 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3076,6 +3076,43 @@ $SIG{TERM} = $SIG{INT} = sub {
=pod
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+ my ($self, $primary, $slot_name, $dbname) = @_;
+ my ($stdout, $stderr);
+
+ my $handle;
+
+ $handle = IPC::Run::start(['pg_recvlogical', '-d',
$self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name,
'--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+ # Once slot restart_lsn is created, the standby looks for
xl_running_xacts
+ # WAL record from the restart_lsn onwards. So firstly, wait until the
slot
+ # restart_lsn is evaluated.
+
+ $self->poll_query_until(
+ 'postgres', qq[
+ SELECT restart_lsn IS NOT NULL
+ FROM pg_catalog.pg_replication_slots WHERE slot_name =
'$slot_name'
+ ]) or die "timed out waiting for logical slot to calculate its
restart_lsn";
+
+ # Now arrange for the xl_running_xacts record for which pg_recvlogical
+ # is waiting.
+ $primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+
+ $handle->finish();
+
+ is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on
standby created')
+ or die "could not create slot" . $slot_name;
+}
+
+=pod
+
=back
=cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 59465b97f3..e834ad5e0d 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -40,6 +40,7 @@ tests += {
't/032_relfilenode_reuse.pl',
't/033_replay_tsp_drops.pl',
't/034_create_database.pl',
+ 't/035_standby_logical_decoding.pl',
],
},
}
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl
b/src/test/recovery/t/035_standby_logical_decoding.pl
new file mode 100644
index 0000000000..0822012e9d
--- /dev/null
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -0,0 +1,705 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 67;
+
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret,
$handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $node_cascading_standby =
PostgreSQL::Test::Cluster->new('cascading_standby');
+my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $res;
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+my $standby_physical_slotname = 'standby_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+ my ($node, $pat, $off) = @_;
+
+ $off = 0 unless defined $off;
+ my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+ return 0 if (length($log) <= $off);
+
+ $log = substr($log, $off);
+
+ return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+ my ($node, $slotname, $check_expr) = @_;
+
+ $node->poll_query_until(
+ 'postgres', qq[
+ SELECT $check_expr
+ FROM pg_catalog.pg_replication_slots
+ WHERE slot_name = '$slotname';
+ ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+ my ($node) = @_;
+ $node->create_logical_slot_on_standby($node_primary, 'inactiveslot',
'testdb');
+ $node->create_logical_slot_on_standby($node_primary, 'activeslot',
'testdb');
+}
+
+# Drop the logical slots on standby.
+sub drop_logical_slots
+{
+ $node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('inactiveslot')]);
+ $node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('activeslot')]);
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot'
slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+ my ($node, $wait, $to_stdout, $to_stderr) = @_;
+ my $slot_user_handle;
+
+ $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d',
$node->connstr('testdb'), '-S', 'activeslot', '-o', 'include-xids=0', '-o',
'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout,
'2>', $to_stderr);
+
+ if ($wait)
+ {
+ # make sure activeslot is in use
+ $node->poll_query_until('testdb',
+ "SELECT EXISTS (SELECT 1 FROM pg_replication_slots
WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+ ) or die "slot never became active";
+ }
+ return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+ my ($slot_user_handle, $check_stderr) = @_;
+ my $return;
+
+ # our client should've terminated in response to the walsender error
+ $slot_user_handle->finish;
+ $return = $?;
+ cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+ if ($return) {
+ like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+ }
+
+ return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+ my ($slot_user_handle) = @_;
+
+ is($node_standby->slot('inactiveslot')->{'slot_type'}, '',
'inactiveslot on standby dropped');
+ is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on
standby dropped');
+
+ check_pg_recvlogical_stderr($slot_user_handle, "conflict with
recovery");
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub change_hot_standby_feedback_and_wait_for_xmins
+{
+ my ($hsf, $invalidated) = @_;
+
+ $node_standby->append_conf('postgresql.conf',qq[
+ hot_standby_feedback = $hsf
+ ]);
+
+ $node_standby->reload;
+
+ if ($hsf && $invalidated)
+ {
+ # With hot_standby_feedback on, xmin should advance,
+ # but catalog_xmin should still remain NULL since there is no
logical slot.
+ wait_for_xmins($node_primary, $primary_slotname,
+ "xmin IS NOT NULL AND catalog_xmin IS NULL");
+ }
+ elsif ($hsf)
+ {
+ # With hot_standby_feedback on, xmin and catalog_xmin should
advance.
+ wait_for_xmins($node_primary, $primary_slotname,
+ "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+ }
+ else
+ {
+ # Both should be NULL since hs_feedback is off
+ wait_for_xmins($node_primary, $primary_slotname,
+ "xmin IS NULL AND catalog_xmin IS NULL");
+
+ }
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+ my ($conflicting) = @_;
+
+ if ($conflicting)
+ {
+ $res = $node_standby->safe_psql(
+ 'postgres', qq(
+ select bool_and(conflicting) from
pg_replication_slots;));
+
+ is($res, 't',
+ "Logical slots are reported as conflicting");
+ }
+ else
+ {
+ $res = $node_standby->safe_psql(
+ 'postgres', qq(
+ select bool_or(conflicting) from
pg_replication_slots;));
+
+ is($res, 'f',
+ "Logical slots are reported as non conflicting");
+ }
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM
pg_create_physical_replication_slot('$primary_slotname');]);
+
+# Check conflicting is NULL for physical slot
+$res = $node_primary->safe_psql(
+ 'postgres', qq[
+ SELECT conflicting is null FROM pg_replication_slots where
slot_name = '$primary_slotname';]);
+
+is($res, 't',
+ "Physical slot reports conflicting as NULL");
+
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+ $node_primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+ qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->safe_psql('testdb', qq[SELECT * FROM
pg_create_physical_replication_slot('$standby_physical_slotname');]);
+
+#######################
+# Initialize cascading standby node
+#######################
+$node_standby->backup($backup_name);
+$node_cascading_standby->init_from_backup(
+ $node_standby, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_cascading_standby->append_conf('postgresql.conf',
+ qq[primary_slot_name = '$standby_physical_slotname']);
+$node_cascading_standby->start;
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s,
s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $result = $node_standby->safe_psql('testdb',
+ qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+ 14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM
generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+ qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+ "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL)
ORDER BY lsn DESC LIMIT 1;"
+);
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM
generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby);
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+ 'testdb', 'activeslot', $endpos, $default_timeout,
+ 'include-xids' => '0',
+ 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+ 'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+ "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name =
'activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+ 'testdb', 'activeslot', $endpos, $default_timeout,
+ 'include-xids' => '0',
+ 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+ 'otherdb',
+ "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL,
NULL) ORDER BY lsn DESC LIMIT 1;"
+ ),
+ 3,
+ 'replaying logical slot from another database fails');
+
+# drop the logical slots
+drop_logical_slots();
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum full on pg_class with hot_standby_feedback turned off on
+# the standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM full pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+ $node_standby,
+ "invalidating replication slot \"inactiveslot\" because it conflicts with
recovery"),
+ 'inactiveslot slot invalidation is logged with vacuum FULL on pg_class');
+
+ok( find_in_log(
+ $node_standby,
+ "invalidating replication slot \"activeslot\" because it conflicts with
recovery"),
+ 'activeslot slot invalidation is logged with vacuum FULL on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been
updated
+ok( $node_standby->poll_query_until(
+ 'postgres',
+ "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts
where datname = 'testdb'", 't'),
+ 'confl_active_logicalslot updated') or die "Timed out waiting
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1,1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the
standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+ $node_standby,
+ "invalidating replication slot \"inactiveslot\" because it conflicts with
recovery", $logstart),
+ 'inactiveslot slot invalidation is logged with vacuum on pg_class');
+
+ok( find_in_log(
+ $node_standby,
+ "invalidating replication slot \"activeslot\" because it conflicts with
recovery", $logstart),
+ 'activeslot slot invalidation is logged with vacuum on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been
updated
+# we now expect 2 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+ 'postgres',
+ "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts
where datname = 'testdb'", 't'),
+ 'confl_active_logicalslot updated') or die "Timed out waiting
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication
slot \"activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a non catalog table
+# Scenario 3: No conflict expected.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# put hot standby feedback to off
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should not trigger a conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO conflict_test(x,y) SELECT s,
s::text FROM generate_series(1,4) s;]);
+$node_primary->safe_psql('testdb', qq[UPDATE conflict_test set x=1, y=1;]);
+$node_primary->safe_psql('testdb', 'VACUUM conflict_test;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should not be issued
+ok( !find_in_log(
+ $node_standby,
+ "invalidating slot \"inactiveslot\" because it conflicts with recovery",
$logstart),
+ 'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
+
+ok( !find_in_log(
+ $node_standby,
+ "invalidating slot \"activeslot\" because it conflicts with recovery",
$logstart),
+ 'activeslot slot invalidation is not logged with vacuum on conflict_test');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been
updated
+# we now still expect 2 conflicts reported as the counter persist across
reloads
+ok( $node_standby->poll_query_until(
+ 'postgres',
+ "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts
where datname = 'testdb'", 't'),
+ 'confl_active_logicalslot not updated') or die "Timed out waiting
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as non conflicting in pg_replication_slots
+check_slots_conflicting_status(0);
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 0);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 4: conflict due to on-access pruning.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to trigger an on-access pruning
+# on a relation marked as user_catalog_table.
+change_hot_standby_feedback_and_wait_for_xmins(0,0);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s
char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+ $node_standby,
+ "invalidating replication slot \"inactiveslot\" because it conflicts with
recovery", $logstart),
+ 'inactiveslot slot invalidation is logged with on-access pruning');
+
+ok( find_in_log(
+ $node_standby,
+ "invalidating replication slot \"activeslot\" because it conflicts with
recovery", $logstart),
+ 'activeslot slot invalidation is logged with on-access pruning');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been
updated
+# we now expect 3 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+ 'postgres',
+ "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts
where datname = 'testdb'", 't'),
+ 'confl_active_logicalslot updated') or die "Timed out waiting
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 5: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+ $node_standby,
+ "invalidating replication slot \"inactiveslot\" because it conflicts with
recovery", $logstart),
+ 'inactiveslot slot invalidation is logged due to wal_level');
+
+ok( find_in_log(
+ $node_standby,
+ "invalidating replication slot \"activeslot\" because it conflicts with
recovery", $logstart),
+ 'activeslot slot invalidation is logged due to wal_level');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been
updated
+# we now expect 3 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+ 'postgres',
+ "select (confl_active_logicalslot = 4) from pg_stat_database_conflicts
where datname = 'testdb'", 't'),
+ 'confl_active_logicalslot updated') or die "Timed out waiting
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# We are not able to read from the slot as it requires wal_level at least
logical on the primary server
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires
wal_level to be at least logical on the primary server");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_replay_catchup($node_standby);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication
slot \"activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot',
'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+is($node_standby->safe_psql('postgres',
+ q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]),
'f',
+ 'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+ 'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_standby->reload;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y
text);]);
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# create the logical slots on the cascading standby too
+create_logical_slots($node_cascading_standby);
+
+# Make slots actives
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 1,
\$cascading_stdout, \$cascading_stderr);
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM
generate_series(1,4) s;]
+);
+
+# Wait for both standbys to catchup
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM
generate_series(5,7) s;]
+);
+
+# Wait for the cascading standby to catchup
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+ qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on
promoted standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion
+my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+ok( pump_until(
+ $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
+ 'got 2 COMMIT from pg_recvlogical output');
+
+chomp($stdout);
+is($stdout, $expected,
+ 'got same expected output from pg_recvlogical decoding session');
+
+# check that we are decoding pre and post promotion inserted rows on the
cascading standby
+$stdout_sql = $node_cascading_standby->safe_psql('testdb',
+ qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on
cascading standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion on the cascading
standby
+ok( pump_until(
+ $cascading_handle, $pump_timeout, \$cascading_stdout,
qr/^.*COMMIT.*COMMIT$/s),
+ 'got 2 COMMIT from pg_recvlogical output');
+
+chomp($cascading_stdout);
+is($cascading_stdout, $expected,
+ 'got same expected output from pg_recvlogical decoding session on
cascading standby');
--
2.34.1
From b811e76c4535c30417ac919ed352a023954c589e Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 16:46:09 +0000
Subject: [PATCH v60 4/6] For cascading replication, wake up physical
walsenders separately from logical walsenders.
Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.
Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.
Author: Bertrand Drouvot per idea from Jeff Davis and Amit Kapila.
Reviewed-By: Sawada Masahiko, Robert Haas.
---
src/backend/access/transam/xlog.c | 6 ++--
src/backend/access/transam/xlogarchive.c | 2 +-
src/backend/access/transam/xlogrecovery.c | 17 ++++++----
src/backend/replication/walreceiver.c | 2 +-
src/backend/replication/walsender.c | 36 +++++++++++++++++----
src/include/replication/walsender.h | 22 ++++++-------
src/include/replication/walsender_private.h | 3 ++
7 files changed, 59 insertions(+), 29 deletions(-)
26.3% src/backend/access/transam/
53.5% src/backend/replication/
20.0% src/include/replication/
diff --git a/src/backend/access/transam/xlog.c
b/src/backend/access/transam/xlog.c
index 779f5c3711..70ac8fc33b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
+ WalSndWakeupProcessRequests(true, !RecoveryInProgress());
/*
* If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
+ WalSndWakeupProcessRequests(true, !RecoveryInProgress());
/*
* Great, done. To take some work off the critical path, try to
initialize
@@ -5773,7 +5773,7 @@ StartupXLOG(void)
* If there were cascading standby servers connected to us, nudge any
wal
* sender processes to notice that we've been promoted.
*/
- WalSndWakeup();
+ WalSndWakeup(true, true);
/*
* If this was a promotion, request an (online) checkpoint now. This
isn't
diff --git a/src/backend/access/transam/xlogarchive.c
b/src/backend/access/transam/xlogarchive.c
index a0f5aa24b5..f3fb92c8f9 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char
*xlogfname)
* if we restored something other than a WAL segment, but it does no
harm
* either.
*/
- WalSndWakeup();
+ WalSndWakeup(true, false);
}
/*
diff --git a/src/backend/access/transam/xlogrecovery.c
b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..df814e6ff7 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1935,6 +1935,17 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord
*record, TimeLineID *repl
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ /*
+ * Wakeup walsenders:
+ *
+ * - physical walsenders in case of new time line and cascade
+ * replication is allowed.
+ * - logical walsenders in case of new time line or recovery is in
progress
+ * (logical decoding on standby).
+ */
+ WalSndWakeup(switchedTLI && AllowCascadeReplication(),
+ switchedTLI || RecoveryInProgress());
+
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
* receiver so that it notices the updated lastReplayedEndRecPtr and
sends
@@ -1958,12 +1969,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord
*record, TimeLineID *repl
*/
RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
- /*
- * Wake up any walsenders to notice that we are on a new
timeline.
- */
- if (AllowCascadeReplication())
- WalSndWakeup();
-
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
}
diff --git a/src/backend/replication/walreceiver.c
b/src/backend/replication/walreceiver.c
index 685af51d5d..d2aa93734c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
/* Signal the startup process and walsender that new WAL has
arrived */
WakeupRecovery();
if (AllowCascadeReplication())
- WalSndWakeup();
+ WalSndWakeup(true, !RecoveryInProgress());
/* Report XLOG streaming progress in PS display */
if (update_process_title)
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index aeb5f93514..aaa78eed52 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2626,6 +2626,23 @@ InitWalSenderSlot(void)
walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
+
+ /*
+ * The kind assignment is done here and not in
StartReplication()
+ * and StartLogicalReplication(). Indeed, the logical
walsender
+ * needs to read WAL records (like snapshot of running
+ * transactions) during the slot creation. So it needs
to be woken
+ * up based on its kind.
+ *
+ * The kind assignment could also be done in
StartReplication(),
+ * StartLogicalReplication() and
CREATE_REPLICATION_SLOT but it
+ * seems better to set it on one place.
+ */
+ if (MyDatabaseId == InvalidOid)
+ walsnd->kind = REPLICATION_KIND_PHYSICAL;
+ else
+ walsnd->kind = REPLICATION_KIND_LOGICAL;
+
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@@ -3308,30 +3325,35 @@ WalSndShmemInit(void)
}
/*
- * Wake up all walsenders
+ * Wake up physical, logical or both walsenders kind
+ *
+ * The distinction between physical and logical walsenders is done. This is due
+ * to the fact that for cascading replication we need to wake up physical
+ * walsenders separately from logical walsenders (see the comment before
calling
+ * WalSndWakeup() in ApplyWalRecord() for more details).
*
* This will be called inside critical sections, so throwing an error is not
* advisable.
*/
void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
{
int i;
for (i = 0; i < max_wal_senders; i++)
{
Latch *latch;
+ ReplicationKind kind;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
- /*
- * Get latch pointer with spinlock held, for the unlikely case
that
- * pointer reads aren't atomic (as they're 8 bytes).
- */
+ /* get latch pointer and kind with spinlock helds */
SpinLockAcquire(&walsnd->mutex);
latch = walsnd->latch;
+ kind = walsnd->kind;
SpinLockRelease(&walsnd->mutex);
- if (latch != NULL)
+ if (latch != NULL && ((physical && kind ==
REPLICATION_KIND_PHYSICAL) ||
+ (logical && kind ==
REPLICATION_KIND_LOGICAL)))
SetLatch(latch);
}
}
diff --git a/src/include/replication/walsender.h
b/src/include/replication/walsender.h
index 52bb3e2aae..9df7e50f94 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
extern void WalSndInitStopping(void);
extern void WalSndWaitStopping(void);
extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
/*
* wakeup walsenders if there is work to be done
*/
-#define WalSndWakeupProcessRequests() \
- do
\
- {
\
- if (wake_wal_senders) \
- {
\
- wake_wal_senders = false; \
- if (max_wal_senders > 0) \
- WalSndWakeup(); \
- }
\
- } while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+ if (wake_wal_senders)
+ {
+ wake_wal_senders = false;
+ if (max_wal_senders > 0)
+ WalSndWakeup(physical, logical);
+ }
+}
#endif /* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h
b/src/include/replication/walsender_private.h
index 5310e054c4..ff25aa70a8 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -15,6 +15,7 @@
#include "access/xlog.h"
#include "lib/ilist.h"
#include "nodes/nodes.h"
+#include "nodes/replnodes.h"
#include "replication/syncrep.h"
#include "storage/latch.h"
#include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;
+
+ ReplicationKind kind;
} WalSnd;
extern PGDLLIMPORT WalSnd *MyWalSnd;
--
2.34.1
From 5f0d4a60eb9fad8bc4f2fd2c8c168c2d822bece3 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 12:45:20 +0000
Subject: [PATCH v60 3/6] Allow logical decoding on standby.
Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary.
Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes
Mello
---
src/backend/access/transam/xlog.c | 11 +++++
src/backend/replication/logical/decode.c | 22 ++++++++-
src/backend/replication/logical/logical.c | 36 +++++++-------
src/backend/replication/slot.c | 58 ++++++++++++-----------
src/backend/replication/walsender.c | 46 +++++++++++-------
src/include/access/xlog.h | 1 +
6 files changed, 113 insertions(+), 61 deletions(-)
4.6% src/backend/access/transam/
37.7% src/backend/replication/logical/
56.7% src/backend/replication/
diff --git a/src/backend/access/transam/xlog.c
b/src/backend/access/transam/xlog.c
index 10085aa0d6..779f5c3711 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset)
ReadControlFile();
}
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+ return ControlFile->wal_level;
+}
+
/*
* Initialization of shared memory for XLOG
*/
diff --git a/src/backend/replication/logical/decode.c
b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..b37b91bbe0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer
*buf)
* can restart from there.
*/
break;
+ case XLOG_PARAMETER_CHANGE:
+ {
+ xl_parameter_change *xlrec =
+ (xl_parameter_change *)
XLogRecGetData(buf->record);
+
+ /*
+ * If wal_level on primary is reduced to less
than logical,
+ * then we want to prevent existing logical
slots from being
+ * used. Existing logical slots on standby get
invalidated
+ * when this WAL record is replayed; and
further, slot
+ * creation fails when the wal level is not
sufficient; but
+ * all these operations are not synchronized,
so a logical
+ * slot may creep in while the wal_level is
being reduced.
+ * Hence this extra check.
+ */
+ if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical
decoding on standby requires wal_level to be at least logical on the primary
server")));
+ break;
+ }
case XLOG_NOOP:
case XLOG_NEXTOID:
case XLOG_SWITCH:
case XLOG_BACKUP_END:
- case XLOG_PARAMETER_CHANGE:
case XLOG_RESTORE_POINT:
case XLOG_FPW_CHANGE:
case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c
b/src/backend/replication/logical/logical.c
index c3ec97a0a6..60a5008b6d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires a database
connection")));
- /* ----
- * TODO: We got to change that someday soon...
- *
- * There's basically three things missing to allow this:
- * 1) We need to be able to correctly and quickly identify the timeline
a
- * LSN belongs to
- * 2) We need to force hot_standby_feedback to be enabled at all times
so
- * the primary cannot remove rows we need.
- * 3) support dropping replication slots referring to a database, in
- * dbase_redo. There can't be any active ones due to HS recovery
- * conflicts, so that should be relatively easy.
- * ----
- */
if (RecoveryInProgress())
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("logical decoding cannot be used while
in recovery")));
+ {
+ /*
+ * This check may have race conditions, but whenever
+ * XLOG_PARAMETER_CHANGE indicates that wal_level has changed,
we
+ * verify that there are no existing logical replication slots.
And to
+ * avoid races around creating a new slot,
+ * CheckLogicalDecodingRequirements() is called once before
creating
+ * the slot, and once when logical decoding is initially
starting up.
+ */
+ if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on standby
requires wal_level to be at least logical on the primary server")));
+ }
}
/*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
LogicalDecodingContext *ctx;
MemoryContext old_context;
+ /*
+ * On standby, this check is also required while creating the slot.
Check
+ * the comments in this function.
+ */
+ CheckLogicalDecodingRequirements();
+
/* shorter lines... */
slot = MyReplicationSlot;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index aacb75bebf..efae9588f3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
#include "access/transam.h"
#include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
#include "common/file_utils.h"
#include "common/string.h"
#include "miscadmin.h"
@@ -1183,37 +1184,28 @@ ReplicationSlotReserveWal(void)
/*
* For logical slots log a standby snapshot and start logical
decoding
* at exactly that position. That allows the slot to start up
more
- * quickly.
+ * quickly. But on a standby we cannot do WAL writes, so just
use the
+ * replay pointer; effectively, an attempt to create a logical
slot on
+ * standby will cause it to wait for an xl_running_xact record
to be
+ * logged independently on the primary, so that a snapshot can
be
+ * built using the record.
*
- * That's not needed (or indeed helpful) for physical slots as
they'll
- * start replay at the last logged checkpoint anyway. Instead
return
- * the location of the last redo LSN. While that slightly
increases
- * the chance that we have to retry, it's where a base backup
has to
- * start replay at.
+ * None of this is needed (or indeed helpful) for physical
slots as
+ * they'll start replay at the last logged checkpoint anyway.
Instead
+ * return the location of the last redo LSN. While that slightly
+ * increases the chance that we have to retry, it's where a base
+ * backup has to start replay at.
*/
- if (!RecoveryInProgress() && SlotIsLogical(slot))
- {
- XLogRecPtr flushptr;
-
- /* start at current insert position */
+ if (SlotIsPhysical(slot))
+ restart_lsn = GetRedoRecPtr();
+ else if (RecoveryInProgress())
+ restart_lsn = GetXLogReplayRecPtr(NULL);
+ else
restart_lsn = GetXLogInsertRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
-
- /* make sure we have enough information to start */
- flushptr = LogStandbySnapshot();
- /* and make sure it's fsynced to disk */
- XLogFlush(flushptr);
- }
- else
- {
- restart_lsn = GetRedoRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
- }
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
/* prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
@@ -1229,8 +1221,18 @@ ReplicationSlotReserveWal(void)
if (XLogGetLastRemovedSegno() < segno)
break;
}
-}
+ if (!RecoveryInProgress() && SlotIsLogical(slot))
+ {
+ XLogRecPtr flushptr;
+
+ /* make sure we have enough information to start */
+ flushptr = LogStandbySnapshot();
+
+ /* and make sure it's fsynced to disk */
+ XLogFlush(flushptr);
+ }
+}
/*
* Report terminating or conflicting message.
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index b686691ca2..aeb5f93514 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,32 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr
targetPagePtr, int req
int count;
WALReadError errinfo;
XLogSegNo segno;
- TimeLineID currTLI = GetWALInsertionTimeLine();
+ TimeLineID currTLI;
/*
- * Since logical decoding is only permitted on a primary server, we know
- * that the current timeline ID can't be changing any more. If we did
this
- * on a standby, we'd have to worry about the values we compute here
- * becoming invalid due to a promotion or timeline change.
+ * Since logical decoding is also permitted on a standby server, we need
+ * to check if the server is in recovery to decide how to get the
current
+ * timeline ID (so that it also cover the promotion or timeline change
+ * cases).
*/
+
+ /* make sure we have enough WAL available */
+ flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+ /* the standby could have been promoted, so check if still in recovery
*/
+ am_cascading_walsender = RecoveryInProgress();
+
+ if (am_cascading_walsender)
+ GetXLogReplayRecPtr(&currTLI);
+ else
+ currTLI = GetWALInsertionTimeLine();
+
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;
- /* make sure we have enough WAL available */
- flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
return -1;
@@ -937,9 +946,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr
targetPagePtr, int req
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
- state->seg.ws_tli, /* Pass the current TLI
because only
- *
WalSndSegmentOpen controls whether new
- * TLI
is needed. */
+ currTLI, /* Pass the current TLI
because only
+ *
WalSndSegmentOpen controls whether new TLI
+ * is needed. */
&errinfo))
WALReadRaiseError(&errinfo);
@@ -3073,10 +3082,14 @@ XLogSendLogical(void)
* If first time through in this session, initialize flushPtr.
Otherwise,
* we only need to update flushPtr if EndRecPtr is past it.
*/
- if (flushPtr == InvalidXLogRecPtr)
- flushPtr = GetFlushRecPtr(NULL);
- else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
- flushPtr = GetFlushRecPtr(NULL);
+ if (flushPtr == InvalidXLogRecPtr ||
+ logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+ {
+ if (am_cascading_walsender)
+ flushPtr = GetStandbyFlushRecPtr(NULL);
+ else
+ flushPtr = GetFlushRecPtr(NULL);
+ }
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3167,7 +3180,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
- *tli = replayTLI;
+ if (tli)
+ *tli = replayTLI;
result = replayPtr;
if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738..48ca852381 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
extern void BootStrapXLOG(void);
extern void InitializeWalConsistencyChecking(void);
extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
extern void StartupXLOG(void);
extern void ShutdownXLOG(int code, Datum arg);
extern void CreateCheckPoint(int flags);
--
2.34.1
From abbcd53b92c22205771d36996a1eda6f02d46ee4 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 11:28:30 +0000
Subject: [PATCH v60 2/6] Arrange for a new pg_stat_database_conflicts and
pg_replication_slots field
As we handled logical slot conflicts on standby on the previous commit, we
can expose the conflict in pg_stat_database_conflicts and pg_replication_slots.
Adding:
- confl_active_logicalslot in pg_stat_database_conflicts
- conflicting in pg_replication_slots
to do so.
---
doc/src/sgml/monitoring.sgml | 11 +++++++++++
doc/src/sgml/system-views.sgml | 10 ++++++++++
src/backend/catalog/system_views.sql | 6 ++++--
src/backend/replication/slotfuncs.c | 12 +++++++++++-
src/backend/utils/activity/pgstat_database.c | 4 ++++
src/backend/utils/adt/pgstatfuncs.c | 3 +++
src/include/catalog/pg_proc.dat | 11 ++++++++---
src/include/pgstat.h | 1 +
src/test/regress/expected/rules.out | 8 +++++---
9 files changed, 57 insertions(+), 9 deletions(-)
33.7% doc/src/sgml/
8.1% src/backend/catalog/
13.1% src/backend/replication/
5.9% src/backend/utils/activity/
5.6% src/backend/utils/adt/
24.6% src/include/catalog/
6.9% src/test/regress/expected/
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index fd0ffbb1e0..9fc585b9e7 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4674,6 +4674,17 @@ SELECT pid, wait_event_type, wait_event FROM
pg_stat_activity WHERE wait_event i
deadlocks
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of active logical slots in this database that have been
+ invalidated because they conflict with recovery (note that inactive ones
+ are also invalidated but do not increment this counter)
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index bb1a418450..57b228076e 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
false for physical slots.
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>conflicting</structfield> <type>bool</type>
+ </para>
+ <para>
+ True if this logical slot conflicted with recovery (and so is now
+ invalidated). Always NULL for physical slots.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/src/backend/catalog/system_views.sql
b/src/backend/catalog/system_views.sql
index 6b098234f8..c25067d06d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -999,7 +999,8 @@ CREATE VIEW pg_replication_slots AS
L.confirmed_flush_lsn,
L.wal_status,
L.safe_wal_size,
- L.two_phase
+ L.two_phase,
+ L.conflicting
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
@@ -1067,7 +1068,8 @@ CREATE VIEW pg_stat_database_conflicts AS
pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
- pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+ pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+ pg_stat_get_db_conflict_logicalslot(D.oid) AS
confl_active_logicalslot
FROM pg_database D;
CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/slotfuncs.c
b/src/backend/replication/slotfuncs.c
index 015d276fd9..6473c73eca 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -403,6 +403,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
+ if (slot_contents.data.database == InvalidOid)
+ nulls[i++] = true;
+ else
+ {
+ if (LogicalReplicationSlotIsInvalid(slot))
+ values[i++] = BoolGetDatum(true);
+ else
+ values[i++] = BoolGetDatum(false);
+ }
+
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/backend/utils/activity/pgstat_database.c
b/src/backend/utils/activity/pgstat_database.c
index 6e650ceaad..7149f22f72 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
dbentry->conflict_bufferpin++;
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ dbentry->conflict_logicalslot++;
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
dbentry->conflict_startup_deadlock++;
break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool
nowait)
PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
PGSTAT_ACCUM_DBCOUNT(conflict_lock);
PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+ PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
diff --git a/src/backend/utils/adt/pgstatfuncs.c
b/src/backend/utils/adt/pgstatfuncs.c
index eec9f3cf9b..4de60d8aa1 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1066,6 +1066,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
/* pg_stat_get_db_xact_rollback */
PG_STAT_GET_DBENTRY_INT64(xact_rollback)
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
Datum
pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1099,6 +1101,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
result = (int64) (dbentry->conflict_tablespace +
dbentry->conflict_lock +
dbentry->conflict_snapshot +
+ dbentry->conflict_logicalslot
+
dbentry->conflict_bufferpin +
dbentry->conflict_startup_deadlock);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f9f2642201..bcbae9036d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5605,6 +5605,11 @@
proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+ descr => 'statistics: recovery conflicts in database caused by logical
replication slot',
+ proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+ prosrc => 'pg_stat_get_db_conflict_logicalslot' },
{ oid => '3068',
descr => 'statistics: recovery conflicts in database caused by shared buffer
pin',
proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
@@ -11071,9 +11076,9 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
- proallargtypes =>
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+ proallargtypes =>
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 75d258d921..fa3d326d86 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -331,6 +331,7 @@ typedef struct PgStat_StatDBEntry
PgStat_Counter conflict_tablespace;
PgStat_Counter conflict_lock;
PgStat_Counter conflict_snapshot;
+ PgStat_Counter conflict_logicalslot;
PgStat_Counter conflict_bufferpin;
PgStat_Counter conflict_startup_deadlock;
PgStat_Counter temp_files;
diff --git a/src/test/regress/expected/rules.out
b/src/test/regress/expected/rules.out
index ab1aebfde4..06d3f1f5d3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
l.confirmed_flush_lsn,
l.wal_status,
l.safe_wal_size,
- l.two_phase
- FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid,
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn,
confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+ l.two_phase,
+ l.conflicting
+ FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid,
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn,
confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
@@ -1869,7 +1870,8 @@ pg_stat_database_conflicts| SELECT oid AS datid,
pg_stat_get_db_conflict_lock(oid) AS confl_lock,
pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot,
pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin,
- pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock
+ pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock,
+ pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot
FROM pg_database d;
pg_stat_gssapi| SELECT pid,
gss_auth AS gss_authenticated,
--
2.34.1
From 6879e222dcf35ddf802caa78d0f1f7d118511c61 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 08:57:56 +0000
Subject: [PATCH v60 1/6] Handle logical slot conflicts on standby.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
During WAL replay on standby, when slot conflict is identified,
invalidate such slots. Also do the same thing if wal_level on the primary server
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery.
Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes
Mello,
Bharath Rupireddy, Amit Kapila, Álvaro Herrera
---
src/backend/access/gist/gistxlog.c | 2 +
src/backend/access/hash/hash_xlog.c | 1 +
src/backend/access/heap/heapam.c | 3 +
src/backend/access/nbtree/nbtxlog.c | 2 +
src/backend/access/spgist/spgxlog.c | 1 +
src/backend/access/transam/xlog.c | 21 +-
.../replication/logical/logicalfuncs.c | 13 +-
src/backend/replication/slot.c | 189 ++++++++++++++----
src/backend/replication/slotfuncs.c | 3 +-
src/backend/replication/walsender.c | 7 +
src/backend/storage/ipc/procsignal.c | 3 +
src/backend/storage/ipc/standby.c | 13 +-
src/backend/tcop/postgres.c | 28 +++
src/include/replication/slot.h | 55 ++++-
src/include/storage/procsignal.h | 1 +
src/include/storage/standby.h | 2 +
16 files changed, 293 insertions(+), 51 deletions(-)
7.0% src/backend/access/transam/
5.3% src/backend/replication/logical/
58.1% src/backend/replication/
5.0% src/backend/storage/ipc/
7.8% src/backend/tcop/
13.0% src/include/replication/
3.3% src/
diff --git a/src/backend/access/gist/gistxlog.c
b/src/backend/access/gist/gistxlog.c
index b7678f3c14..9a86fb3fef 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+
xldata->isCatalogRel,
rlocator);
}
@@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+
xlrec->isCatalogRel,
xlrec->locator);
}
diff --git a/src/backend/access/hash/hash_xlog.c
b/src/backend/access/hash/hash_xlog.c
index f2dd9be8d3..e8e06c62a9 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+
xldata->isCatalogRel,
rlocator);
}
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f7d9ce59a4..371e855683 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8717,6 +8717,7 @@ heap_xlog_prune(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+
xlrec->isCatalogRel,
rlocator);
/*
@@ -8888,6 +8889,7 @@ heap_xlog_visible(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+
xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
rlocator);
/*
@@ -9009,6 +9011,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+
xlrec->isCatalogRel,
rlocator);
}
diff --git a/src/backend/access/nbtree/nbtxlog.c
b/src/backend/access/nbtree/nbtxlog.c
index 414ca4f6de..c87e46ed66 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+
xlrec->isCatalogRel,
rlocator);
}
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
if (InHotStandby)
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+
xlrec->isCatalogRel,
xlrec->locator);
}
diff --git a/src/backend/access/spgist/spgxlog.c
b/src/backend/access/spgist/spgxlog.c
index b071b59c8a..459ac929ba 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+
xldata->isCatalogRel,
locator);
}
diff --git a/src/backend/access/transam/xlog.c
b/src/backend/access/transam/xlog.c
index 46821ad605..10085aa0d6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,7 @@ CreateCheckPoint(int flags)
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
KeepLogSeg(recptr, &_logSegNo);
- if (InvalidateObsoleteReplicationSlots(_logSegNo))
+ if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, NULL))
{
/*
* Some slots have been invalidated; recalculate the old-segment
@@ -7250,7 +7250,7 @@ CreateRestartPoint(int flags)
replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
KeepLogSeg(endptr, &_logSegNo);
- if (InvalidateObsoleteReplicationSlots(_logSegNo))
+ if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, NULL))
{
/*
* Some slots have been invalidated; recalculate the old-segment
@@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record)
/* Update our copy of the parameters in pg_control */
memcpy(&xlrec, XLogRecGetData(record),
sizeof(xl_parameter_change));
+ /*
+ * Invalidate logical slots if we are in hot standby and the
primary
+ * does not have a WAL level sufficient for logical decoding.
No need
+ * to search for potentially conflicting logically slots if
standby is
+ * running with wal_level lower than logical, because in that
case, we
+ * would have either disallowed creation of logical slots or
+ * invalidated existing ones.
+ */
+ if (InRecovery && InHotStandby &&
+ xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+ wal_level >= WAL_LEVEL_LOGICAL)
+ {
+ TransactionId ConflictHorizon = InvalidTransactionId;
+
+ InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr,
InvalidOid, &ConflictHorizon);
+ }
+
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->MaxConnections = xlrec.MaxConnections;
ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/replication/logical/logicalfuncs.c
b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..575a047e53 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -216,9 +216,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo,
bool confirm, bool bin
/*
* After the sanity checks in CreateDecodingContext, make sure
the
- * restart_lsn is valid. Avoid "cannot get changes" wording in
this
- * errmsg because that'd be confusingly ambiguous about no
changes
- * being available.
+ * restart_lsn is valid or both effective_xmin and catalog_xmin
are
+ * valid. Avoid "cannot get changes" wording in this errmsg
because
+ * that'd be confusingly ambiguous about no changes being
available.
*/
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
ereport(ERROR,
@@ -227,6 +227,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo,
bool confirm, bool bin
NameStr(*name)),
errdetail("This slot has never
previously reserved WAL, or it has been invalidated.")));
+ if (LogicalReplicationSlotIsInvalid(MyReplicationSlot))
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot read from logical
replication slot \"%s\"",
+ NameStr(*name)),
+ errdetail("This slot has been
invalidated because it was conflicting with recovery.")));
+
MemoryContextSwitchTo(oldcontext);
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2293c0c6fc..aacb75bebf 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -110,6 +110,13 @@ static void RestoreSlotFromDisk(const char *name);
static void CreateSlotOnDisk(ReplicationSlot *slot);
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
+/* to report termination/invalidation */
+static void ReportTerminationInvalidation(bool terminating, bool check_on_xid,
+
int pid, NameData slotname,
+
TransactionId *xid,
+
XLogRecPtr restart_lsn,
+
XLogRecPtr oldestLSN);
+
/*
* Report shared-memory space needed by ReplicationSlotsShmemInit.
*/
@@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
SpinLockAcquire(&s->mutex);
effective_xmin = s->effective_xmin;
effective_catalog_xmin = s->effective_catalog_xmin;
- invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-
XLogRecPtrIsInvalid(s->data.restart_lsn));
+ invalidated = ObsoleteSlotIsInvalid(s, true) ||
LogicalReplicationSlotIsInvalid(s);
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
@@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void)
}
}
+
+/*
+ * Report terminating or conflicting message.
+ *
+ * For both, logical conflict on standby and obsolete slot are handled.
+ */
+static void
+ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid,
+ NameData slotname,
TransactionId *xid,
+ XLogRecPtr
restart_lsn, XLogRecPtr oldestLSN)
+{
+ StringInfoData err_msg;
+ StringInfoData err_detail;
+ bool hint = false;
+
+ initStringInfo(&err_detail);
+
+ if (check_on_xid)
+ {
+ if (!terminating)
+ {
+ initStringInfo(&err_msg);
+ appendStringInfo(&err_msg, _("invalidating replication
slot \"%s\" because it conflicts with recovery"),
+ NameStr(slotname));
+ }
+
+ if (TransactionIdIsValid(*xid))
+ appendStringInfo(&err_detail, _("The slot conflicted
with xid horizon %u."), *xid);
+ else
+ appendStringInfo(&err_detail, _("Logical decoding on
standby requires wal_level to be at least logical on the primary server"));
+ }
+ else
+ {
+ if (!terminating)
+ {
+ initStringInfo(&err_msg);
+ appendStringInfo(&err_msg, _("invalidating obsolete
replication slot \"%s\""),
+ NameStr(slotname));
+ }
+
+ appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X
exceeds the limit by %llu bytes."),
+ LSN_FORMAT_ARGS(restart_lsn),
+ (unsigned long long)
(oldestLSN - restart_lsn));
+
+ hint = true;
+ }
+
+ ereport(LOG,
+ terminating ? errmsg("terminating process %d to release
replication slot \"%s\"", pid, NameStr(slotname)) :
+ errmsg_internal("%s", err_msg.data),
+ errdetail_internal("%s", err_detail.data),
+ hint ? errhint("You might need to increase
max_slot_wal_keep_size.") : 0);
+
+ if (!terminating)
+ pfree(err_msg.data);
+
+ pfree(err_detail.data);
+}
+
/*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
*
* Returns whether ReplicationSlotControlLock was released in the interim (and
* in that case we're not holding the lock at return, otherwise we are).
*
- * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
+ * Sets *invalidated true if an obsolete slot was invalidated. (Untouched
otherwise.)
*
* This is inherently racy, because we release the LWLock
* for syscalls, so caller must restart if we return true.
*/
static bool
InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
- bool *invalidated)
+ bool *invalidated,
TransactionId *xid)
{
int last_signaled_pid = 0;
bool released_lock = false;
+ bool check_on_xid;
+
+ check_on_xid = xid ? true : false;
for (;;)
{
XLogRecPtr restart_lsn;
+
NameData slotname;
int active_pid = 0;
@@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s,
XLogRecPtr oldestLSN,
* Check if the slot needs to be invalidated. If it needs to be
* invalidated, and is not currently acquired, acquire it and
mark it
* as having been invalidated. We do this with the spinlock
held to
- * avoid race conditions -- for example the restart_lsn could
move
- * forward, or the slot could be dropped.
+ * avoid race conditions -- for example the restart_lsn (or the
+ * xmin(s) could) move forward or the slot could be dropped.
*/
SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn;
/*
- * If the slot is already invalid or is fresh enough, we don't
need to
- * do anything.
+ * If the slot is already invalid or is a non conflicting slot,
we
+ * don't need to do anything.
*/
- if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >=
oldestLSN)
+ if (DoNotInvalidateSlot(s, xid, &oldestLSN))
{
+ /* then, we are not forcing for invalidation */
SpinLockRelease(&s->mutex);
if (released_lock)
LWLockRelease(ReplicationSlotControlLock);
@@ -1294,9 +1365,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s,
XLogRecPtr oldestLSN,
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
- s->data.invalidated_at = restart_lsn;
- s->data.restart_lsn = InvalidXLogRecPtr;
-
+ if (xid)
+ {
+ s->effective_xmin = InvalidTransactionId;
+ s->data.catalog_xmin = InvalidTransactionId;
+ }
+ else
+ {
+ s->data.invalidated_at = restart_lsn;
+ s->data.restart_lsn = InvalidXLogRecPtr;
+ }
/* Let caller know */
*invalidated = true;
}
@@ -1329,15 +1407,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s,
XLogRecPtr oldestLSN,
*/
if (last_signaled_pid != active_pid)
{
- ereport(LOG,
- errmsg("terminating process %d
to release replication slot \"%s\"",
- active_pid,
NameStr(slotname)),
- errdetail("The slot's
restart_lsn %X/%X exceeds the limit by %llu bytes.",
-
LSN_FORMAT_ARGS(restart_lsn),
- (unsigned
long long) (oldestLSN - restart_lsn)),
- errhint("You might need to
increase max_slot_wal_keep_size."));
-
- (void) kill(active_pid, SIGTERM);
+ ReportTerminationInvalidation(true,
check_on_xid, active_pid,
+
slotname, xid, restart_lsn,
+
oldestLSN);
+
+ if (check_on_xid)
+ (void) SendProcSignal(active_pid,
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
+ else
+ (void) kill(active_pid, SIGTERM);
+
last_signaled_pid = active_pid;
}
@@ -1370,14 +1448,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s,
XLogRecPtr oldestLSN,
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
+ pgstat_drop_replslot(s);
- ereport(LOG,
- errmsg("invalidating obsolete
replication slot \"%s\"",
- NameStr(slotname)),
- errdetail("The slot's restart_lsn %X/%X
exceeds the limit by %llu bytes.",
-
LSN_FORMAT_ARGS(restart_lsn),
- (unsigned long long)
(oldestLSN - restart_lsn)),
- errhint("You might need to increase
max_slot_wal_keep_size."));
+ ReportTerminationInvalidation(false, check_on_xid,
active_pid,
+
slotname, xid, restart_lsn,
+
oldestLSN);
/* done with this slot for now */
break;
@@ -1390,20 +1465,36 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s,
XLogRecPtr oldestLSN,
}
/*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate Obsolete slots or resolve recovery conflicts with logical slots.
*
- * Returns true when any slot have got invalidated.
+ * Obsolete case (aka xid is NULL):
*
- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ * Mark any slot that points to an LSN older than the given segment
+ * as invalid; it requires WAL that's about to be removed.
+ * invalidated is set to true when any slot have got invalidated.
+ *
+ * Logical replication slot case:
+ *
+ * When xid is valid, it means that we are about to remove rows older
than xid.
+ * Therefore we need to invalidate slots that depend on seeing those rows.
+ * When xid is invalid, invalidate all logical slots. This is required
when the
+ * master wal_level is set back to replica, so existing logical slots
need to
+ * be invalidated.
*/
bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid,
TransactionId *xid)
{
- XLogRecPtr oldestLSN;
+
+ XLogRecPtr oldestLSN = InvalidXLogRecPtr;
bool invalidated = false;
- XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+ Assert(max_replication_slots >= 0);
+
+ if (max_replication_slots == 0)
+ return invalidated;
+
+ if (!xid)
+ XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size,
oldestLSN);
restart:
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1414,21 +1505,35 @@ restart:
if (!s->in_use)
continue;
- if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+ if (xid)
{
- /* if the lock was released, start from scratch */
- goto restart;
+ /* we are only dealing with *logical* slot conflicts */
+ if (!SlotIsLogical(s))
+ continue;
+
+ /*
+ * not the database of interest and we don't want all
the
+ * database, skip
+ */
+ if (s->data.database != dboid &&
TransactionIdIsValid(*xid))
+ continue;
}
+
+ if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated,
xid))
+ goto restart;
}
+
LWLockRelease(ReplicationSlotControlLock);
/*
- * If any slots have been invalidated, recalculate the resource limits.
+ * If any slots have been invalidated, recalculate the required xmin and
+ * the required lsn (if appropriate).
*/
if (invalidated)
{
ReplicationSlotsComputeRequiredXmin(false);
- ReplicationSlotsComputeRequiredLSN();
+ if (!xid)
+ ReplicationSlotsComputeRequiredLSN();
}
return invalidated;
diff --git a/src/backend/replication/slotfuncs.c
b/src/backend/replication/slotfuncs.c
index 2f3c964824..015d276fd9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -319,8 +319,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
* certain that the slot has been invalidated. Otherwise, test
* availability from restart_lsn.
*/
- if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
- !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+ if (ObsoleteSlotIsInvalid(slot, true))
walstate = WALAVAIL_REMOVED;
else
walstate =
GetWALAvailability(slot_contents.data.restart_lsn);
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index 75e8363e24..b686691ca2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,6 +1253,13 @@ StartLogicalReplication(StartReplicationCmd *cmd)
ReplicationSlotAcquire(cmd->slotname, true);
+ if (LogicalReplicationSlotIsInvalid(MyReplicationSlot))
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot read from logical replication
slot \"%s\"",
+ cmd->slotname),
+ errdetail("This slot has been invalidated
because it was conflicting with recovery.")));
+
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/storage/ipc/procsignal.c
b/src/backend/storage/ipc/procsignal.c
index 395b2cf690..c85cb5cc18 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
+ if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
diff --git a/src/backend/storage/ipc/standby.c
b/src/backend/storage/ipc/standby.c
index 9f56b4e95c..c62245afc7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId
*waitlist,
*/
void
ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+ bool
isCatalogRel,
RelFileLocator locator)
{
VirtualTransactionId *backends;
@@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId
snapshotConflictHorizon,
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
true);
+
+ if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+ InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr,
locator.dbOid, &snapshotConflictHorizon);
}
/*
@@ -499,6 +504,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId
snapshotConflictHorizon,
*/
void
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId
snapshotConflictHorizon,
+
bool isCatalogRel,
RelFileLocator locator)
{
/*
@@ -517,7 +523,9 @@
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
TransactionId truncated;
truncated = XidFromFullTransactionId(snapshotConflictHorizon);
- ResolveRecoveryConflictWithSnapshot(truncated, locator);
+ ResolveRecoveryConflictWithSnapshot(truncated,
+
isCatalogRel,
+
locator);
}
}
@@ -1478,6 +1486,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
reasonDesc = _("recovery conflict on snapshot");
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ reasonDesc = _("recovery conflict on replication slot");
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
reasonDesc = _("recovery conflict on buffer deadlock");
break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50..fb2de8542d 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
errdetail("User query might have needed to see row
versions that must be removed.");
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ errdetail("User was using the logical slot that must be
dropped.");
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
errdetail("User transaction caused buffer deadlock with
recovery.");
break;
@@ -3137,6 +3140,31 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
/* Intentional fall through to session cancel */
/* FALLTHROUGH */
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+
+ /*
+ * For conflicts that require a logical slot to
be
+ * invalidated, the requirement is for the
signal receiver to
+ * release the slot, so that it could be
invalidated by the
+ * signal sender. So for normal backends, the
transaction
+ * should be aborted, just like for other
recovery conflicts.
+ * But if it's walsender on standby, we don't
want to go
+ * through the following
IsTransactionOrTransactionBlock()
+ * check, so break here.
+ */
+ if (am_cascading_walsender &&
+ reason ==
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+ MyReplicationSlot &&
SlotIsLogical(MyReplicationSlot))
+ {
+ RecoveryConflictPending = true;
+ QueryCancelPending = true;
+ InterruptPending = true;
+ break;
+ }
+
+ /* Intentional fall through to session cancel */
+ /* FALLTHROUGH */
+
case PROCSIG_RECOVERY_CONFLICT_DATABASE:
RecoveryConflictPending = true;
ProcDiePending = true;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8872c80cdf..914b6aebc3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -168,6 +168,58 @@ typedef struct ReplicationSlot
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
+static inline bool
+ObsoleteSlotIsInvalid(ReplicationSlot *s, bool check_invalidated_at)
+{
+ if (check_invalidated_at)
+ return (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
+ XLogRecPtrIsInvalid(s->data.restart_lsn));
+ else
+ return (XLogRecPtrIsInvalid(s->data.restart_lsn));
+}
+
+static inline bool
+LogicalReplicationSlotIsInvalid(ReplicationSlot *s)
+{
+ return (!TransactionIdIsValid(s->effective_xmin) &&
+ !TransactionIdIsValid(s->data.catalog_xmin));
+}
+
+static inline bool
+LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
+{
+ TransactionId slot_effective_xmin;
+ TransactionId slot_catalog_xmin;
+
+ slot_effective_xmin = s->effective_xmin;
+ slot_catalog_xmin = s->data.catalog_xmin;
+
+ return (((TransactionIdIsValid(slot_effective_xmin) &&
TransactionIdPrecedesOrEquals(slot_effective_xmin, xid)) ||
+ (TransactionIdIsValid(slot_catalog_xmin) &&
TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))));
+}
+
+static inline bool
+SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+ return (s->data.restart_lsn >= oldestLSN);
+}
+
+static inline bool
+LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid)
+{
+ return (TransactionIdIsValid(*xid) &&
!LogicalReplicationSlotXidsConflict(s, *xid));
+}
+
+static inline bool
+DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr
*oldestLSN)
+{
+ if (xid)
+ return (LogicalReplicationSlotIsInvalid(s) ||
LogicalSlotIsNotConflicting(s, xid));
+ else
+ return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s,
*oldestLSN));
+
+}
+
/*
* Shared memory control area for all of replication slots.
*/
@@ -215,7 +267,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid
dboid, TransactionId *xid);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool
need_lock);
extern int ReplicationSlotIndex(ReplicationSlot *slot);
extern bool ReplicationSlotName(int index, Name name);
@@ -227,5 +279,6 @@ extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void);
extern void CheckSlotPermissions(void);
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId
xid, char *reason);
#endif /* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 905af2231b..2f52100b00 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -42,6 +42,7 @@ typedef enum
PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
PROCSIG_RECOVERY_CONFLICT_LOCK,
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2effdea126..41f4dc372e 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
extern void ShutdownRecoveryTransactionEnvironment(void);
extern void ResolveRecoveryConflictWithSnapshot(TransactionId
snapshotConflictHorizon,
+
bool isCatalogRel,
RelFileLocator locator);
extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId
snapshotConflictHorizon,
+
bool isCatalogRel,
RelFileLocator locator);
extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
--
2.34.1