Hi, On 4/4/23 7:53 PM, Andres Freund wrote:
Hi,On 2023-04-04 18:54:33 +0200, Drouvot, Bertrand wrote:if (check_on_xid) { if (terminating) appendStringInfo(&err_msg, _("terminating process %d to release replication slot \"%s\" because it conflicts with recovery"), pid, NameStr(slotname));FWIW, I would just use exactly the same error message as today here. errmsg("terminating process %d to release replication slot \"%s\"", active_pid, NameStr(slotname)), This is accurate for both the existing and the new case. Then there's no need to put that string into a stringinfo either.
Right, thanks! Did it that way in V60 attached. Regards, -- Bertrand Drouvot PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From 4861568b10fe2187d38f2997ad916a984918aa5b Mon Sep 17 00:00:00 2001 From: bdrouvotAWS <[email protected]> Date: Tue, 7 Feb 2023 14:08:11 +0000 Subject: [PATCH v60 6/6] Doc changes describing details about logical decoding. Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello --- doc/src/sgml/logicaldecoding.sgml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) 100.0% doc/src/sgml/ diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 4e912b4bd4..3da254ed1f 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -316,6 +316,28 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU may consume changes from a slot at any given time. </para> + <para> + A logical replication slot can also be created on a hot standby. To prevent + <command>VACUUM</command> from removing required rows from the system + catalogs, <varname>hot_standby_feedback</varname> should be set on the + standby. In spite of that, if any required rows get removed, the slot gets + invalidated. It's highly recommended to use a physical slot between the primary + and the standby. Otherwise, hot_standby_feedback will work, but only while the + connection is alive (for example a node restart would break it). Existing + logical slots on standby also get invalidated if wal_level on primary is reduced to + less than 'logical'. + </para> + + <para> + For a logical slot to be created, it builds a historic snapshot, for which + information of all the currently running transactions is essential. On + primary, this information is available, but on standby, this information + has to be obtained from primary. So, slot creation may wait for some + activity to happen on the primary. If the primary is idle, creating a + logical slot on standby may take a noticeable time. One option to speed it + is to call the <function>pg_log_standby_snapshot</function> on the primary. + </para> + <caution> <para> Replication slots persist across crashes and know nothing about the state -- 2.34.1
From 3dd93d8dbc601accedceae199e539ba74252e092 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS <[email protected]> Date: Tue, 7 Feb 2023 09:04:12 +0000 Subject: [PATCH v60 5/6] New TAP test for logical decoding on standby. In addition to the new TAP test, this commit introduces a new pg_log_standby_snapshot() function. The idea is to be able to take a snapshot of running transactions and write this to WAL without requesting for a (costly) checkpoint. Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello --- doc/src/sgml/func.sgml | 15 + src/backend/access/transam/xlogfuncs.c | 32 + src/backend/catalog/system_functions.sql | 2 + src/include/catalog/pg_proc.dat | 3 + src/test/perl/PostgreSQL/Test/Cluster.pm | 37 + src/test/recovery/meson.build | 1 + .../t/035_standby_logical_decoding.pl | 705 ++++++++++++++++++ 7 files changed, 795 insertions(+) 3.1% src/backend/ 4.0% src/test/perl/PostgreSQL/Test/ 89.7% src/test/recovery/t/ diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 918a492234..939fb8019f 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27034,6 +27034,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset prepared with <xref linkend="sql-prepare-transaction"/>. </para></entry> </row> + <row> + <entry role="func_table_entry"><para role="func_signature"> + <indexterm> + <primary>pg_log_standby_snapshot</primary> + </indexterm> + <function>pg_log_standby_snapshot</function> () + <returnvalue>pg_lsn</returnvalue> + </para> + <para> + Take a snapshot of running transactions and write this to WAL without + having to wait bgwriter or checkpointer to log one. This one is useful for + logical decoding on standby for which logical slot creation is hanging + until such a record is replayed on the standby. + </para></entry> + </row> </tbody> </tgroup> </table> diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index c07daa874f..481e9a47da 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -38,6 +38,7 @@ #include "utils/pg_lsn.h" #include "utils/timestamp.h" #include "utils/tuplestore.h" +#include "storage/standby.h" /* * Backup-related variables. @@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS) PG_RETURN_LSN(switchpoint); } +/* + * pg_log_standby_snapshot: call LogStandbySnapshot() + * + * Permission checking for this function is managed through the normal + * GRANT system. + */ +Datum +pg_log_standby_snapshot(PG_FUNCTION_ARGS) +{ + XLogRecPtr recptr; + + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is in progress"), + errhint("pg_log_standby_snapshot() cannot be executed during recovery."))); + + if (!XLogStandbyInfoActive()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("wal_level is not in desired state"), + errhint("wal_level has to be >= WAL_LEVEL_REPLICA."))); + + recptr = LogStandbySnapshot(); + + /* + * As a convenience, return the WAL location of the last inserted record + */ + PG_RETURN_LSN(recptr); +} + /* * pg_create_restore_point: a named point for restore * diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 83ca893444..b7c65ea37d 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public; REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public; +REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public; + REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public; REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index bcbae9036d..284138727e 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6426,6 +6426,9 @@ { oid => '2848', descr => 'switch to new wal file', proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn', proargtypes => '', prosrc => 'pg_switch_wal' }, +{ oid => '9658', descr => 'log details of the current snapshot to WAL', + proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 'pg_lsn', + proargtypes => '', prosrc => 'pg_log_standby_snapshot' }, { oid => '3098', descr => 'create a named restore point', proname => 'pg_create_restore_point', provolatile => 'v', prorettype => 'pg_lsn', proargtypes => 'text', diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index a3aef8b5e9..62376de602 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -3076,6 +3076,43 @@ $SIG{TERM} = $SIG{INT} = sub { =pod +=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname) + +Create logical replication slot on given standby + +=cut + +sub create_logical_slot_on_standby +{ + my ($self, $primary, $slot_name, $dbname) = @_; + my ($stdout, $stderr); + + my $handle; + + $handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr); + + # Once slot restart_lsn is created, the standby looks for xl_running_xacts + # WAL record from the restart_lsn onwards. So firstly, wait until the slot + # restart_lsn is evaluated. + + $self->poll_query_until( + 'postgres', qq[ + SELECT restart_lsn IS NOT NULL + FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name' + ]) or die "timed out waiting for logical slot to calculate its restart_lsn"; + + # Now arrange for the xl_running_xacts record for which pg_recvlogical + # is waiting. + $primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()'); + + $handle->finish(); + + is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created') + or die "could not create slot" . $slot_name; +} + +=pod + =back =cut diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 59465b97f3..e834ad5e0d 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -40,6 +40,7 @@ tests += { 't/032_relfilenode_reuse.pl', 't/033_replay_tsp_drops.pl', 't/034_create_database.pl', + 't/035_standby_logical_decoding.pl', ], }, } diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl new file mode 100644 index 0000000000..0822012e9d --- /dev/null +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -0,0 +1,705 @@ +# logical decoding on standby : test logical decoding, +# recovery conflict and standby promotion. + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 67; + +my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot); + +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby'); +my $default_timeout = $PostgreSQL::Test::Utils::timeout_default; +my $res; + +# Name for the physical slot on primary +my $primary_slotname = 'primary_physical'; +my $standby_physical_slotname = 'standby_physical'; + +# find $pat in logfile of $node after $off-th byte +sub find_in_log +{ + my ($node, $pat, $off) = @_; + + $off = 0 unless defined $off; + my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile); + return 0 if (length($log) <= $off); + + $log = substr($log, $off); + + return $log =~ m/$pat/; +} + +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for +# given boolean condition to be true to ensure we've reached a quiescent state. +sub wait_for_xmins +{ + my ($node, $slotname, $check_expr) = @_; + + $node->poll_query_until( + 'postgres', qq[ + SELECT $check_expr + FROM pg_catalog.pg_replication_slots + WHERE slot_name = '$slotname'; + ]) or die "Timed out waiting for slot xmins to advance"; +} + +# Create the required logical slots on standby. +sub create_logical_slots +{ + my ($node) = @_; + $node->create_logical_slot_on_standby($node_primary, 'inactiveslot', 'testdb'); + $node->create_logical_slot_on_standby($node_primary, 'activeslot', 'testdb'); +} + +# Drop the logical slots on standby. +sub drop_logical_slots +{ + $node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('inactiveslot')]); + $node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('activeslot')]); +} + +# Acquire one of the standby logical slots created by create_logical_slots(). +# In case wait is true we are waiting for an active pid on the 'activeslot' slot. +# If wait is not true it means we are testing a known failure scenario. +sub make_slot_active +{ + my ($node, $wait, $to_stdout, $to_stderr) = @_; + my $slot_user_handle; + + $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node->connstr('testdb'), '-S', 'activeslot', '-o', 'include-xids=0', '-o', 'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, '2>', $to_stderr); + + if ($wait) + { + # make sure activeslot is in use + $node->poll_query_until('testdb', + "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)" + ) or die "slot never became active"; + } + return $slot_user_handle; +} + +# Check pg_recvlogical stderr +sub check_pg_recvlogical_stderr +{ + my ($slot_user_handle, $check_stderr) = @_; + my $return; + + # our client should've terminated in response to the walsender error + $slot_user_handle->finish; + $return = $?; + cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero"); + if ($return) { + like($stderr, qr/$check_stderr/, 'slot has been invalidated'); + } + + return 0; +} + +# Check if all the slots on standby are dropped. These include the 'activeslot' +# that was acquired by make_slot_active(), and the non-active 'inactiveslot'. +sub check_slots_dropped +{ + my ($slot_user_handle) = @_; + + is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 'inactiveslot on standby dropped'); + is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped'); + + check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery"); +} + +# Check if all the slots on standby are dropped. These include the 'activeslot' +# that was acquired by make_slot_active(), and the non-active 'inactiveslot'. +sub change_hot_standby_feedback_and_wait_for_xmins +{ + my ($hsf, $invalidated) = @_; + + $node_standby->append_conf('postgresql.conf',qq[ + hot_standby_feedback = $hsf + ]); + + $node_standby->reload; + + if ($hsf && $invalidated) + { + # With hot_standby_feedback on, xmin should advance, + # but catalog_xmin should still remain NULL since there is no logical slot. + wait_for_xmins($node_primary, $primary_slotname, + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + } + elsif ($hsf) + { + # With hot_standby_feedback on, xmin and catalog_xmin should advance. + wait_for_xmins($node_primary, $primary_slotname, + "xmin IS NOT NULL AND catalog_xmin IS NOT NULL"); + } + else + { + # Both should be NULL since hs_feedback is off + wait_for_xmins($node_primary, $primary_slotname, + "xmin IS NULL AND catalog_xmin IS NULL"); + + } +} + +# Check conflicting status in pg_replication_slots. +sub check_slots_conflicting_status +{ + my ($conflicting) = @_; + + if ($conflicting) + { + $res = $node_standby->safe_psql( + 'postgres', qq( + select bool_and(conflicting) from pg_replication_slots;)); + + is($res, 't', + "Logical slots are reported as conflicting"); + } + else + { + $res = $node_standby->safe_psql( + 'postgres', qq( + select bool_or(conflicting) from pg_replication_slots;)); + + is($res, 'f', + "Logical slots are reported as non conflicting"); + } +} + +######################## +# Initialize primary node +######################## + +$node_primary->init(allows_streaming => 1, has_archiving => 1); +$node_primary->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +}); +$node_primary->dump_info; +$node_primary->start; + +$node_primary->psql('postgres', q[CREATE DATABASE testdb]); + +$node_primary->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]); + +# Check conflicting is NULL for physical slot +$res = $node_primary->safe_psql( + 'postgres', qq[ + SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]); + +is($res, 't', + "Physical slot reports conflicting as NULL"); + +my $backup_name = 'b1'; +$node_primary->backup($backup_name); + +####################### +# Initialize standby node +####################### + +$node_standby->init_from_backup( + $node_primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_standby->append_conf('postgresql.conf', + qq[primary_slot_name = '$primary_slotname']); +$node_standby->start; +$node_primary->wait_for_replay_catchup($node_standby); +$node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]); + +####################### +# Initialize cascading standby node +####################### +$node_standby->backup($backup_name); +$node_cascading_standby->init_from_backup( + $node_standby, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_cascading_standby->append_conf('postgresql.conf', + qq[primary_slot_name = '$standby_physical_slotname']); +$node_cascading_standby->start; +$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary); + +################################################## +# Test that logical decoding on the standby +# behaves correctly. +################################################## + +# create the logical slots +create_logical_slots($node_standby); + +$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]); +$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]); + +$node_primary->wait_for_replay_catchup($node_standby); + +my $result = $node_standby->safe_psql('testdb', + qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]); + +# test if basic decoding works +is(scalar(my @foobar = split /^/m, $result), + 14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)'); + +# Insert some rows and verify that we get the same results from pg_recvlogical +# and the SQL interface. +$node_primary->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;] +); + +my $expected = q{BEGIN +table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' +table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' +table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' +table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' +COMMIT}; + +$node_primary->wait_for_replay_catchup($node_standby); + +my $stdout_sql = $node_standby->safe_psql('testdb', + qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] +); + +is($stdout_sql, $expected, 'got expected output from SQL decoding session'); + +my $endpos = $node_standby->safe_psql('testdb', + "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;" +); + +# Insert some rows after $endpos, which we won't read. +$node_primary->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;] +); + +$node_primary->wait_for_catchup($node_standby); + +my $stdout_recv = $node_standby->pg_recvlogical_upto( + 'testdb', 'activeslot', $endpos, $default_timeout, + 'include-xids' => '0', + 'skip-empty-xacts' => '1'); +chomp($stdout_recv); +is($stdout_recv, $expected, + 'got same expected output from pg_recvlogical decoding session'); + +$node_standby->poll_query_until('testdb', + "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'activeslot' AND active_pid IS NULL)" +) or die "slot never became inactive"; + +$stdout_recv = $node_standby->pg_recvlogical_upto( + 'testdb', 'activeslot', $endpos, $default_timeout, + 'include-xids' => '0', + 'skip-empty-xacts' => '1'); +chomp($stdout_recv); +is($stdout_recv, '', 'pg_recvlogical acknowledged changes'); + +$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb'); + +is( $node_primary->psql( + 'otherdb', + "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;" + ), + 3, + 'replaying logical slot from another database fails'); + +# drop the logical slots +drop_logical_slots(); + +################################################## +# Recovery conflict: Invalidate conflicting slots, including in-use slots +# Scenario 1: hot_standby_feedback off and vacuum FULL +################################################## + +# create the logical slots +create_logical_slots($node_standby); + +# One way to produce recovery conflict is to create/drop a relation and +# launch a vacuum full on pg_class with hot_standby_feedback turned off on +# the standby. +change_hot_standby_feedback_and_wait_for_xmins(0,1); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# This should trigger the conflict +$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]); +$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]); +$node_primary->safe_psql('testdb', 'VACUUM full pg_class;'); + +$node_primary->wait_for_replay_catchup($node_standby); + +# message should be issued +ok( find_in_log( + $node_standby, + "invalidating replication slot \"inactiveslot\" because it conflicts with recovery"), + 'inactiveslot slot invalidation is logged with vacuum FULL on pg_class'); + +ok( find_in_log( + $node_standby, + "invalidating replication slot \"activeslot\" because it conflicts with recovery"), + 'activeslot slot invalidation is logged with vacuum FULL on pg_class'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as conflicting in pg_replication_slots +check_slots_conflicting_status(1); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); + +# We are not able to read from the slot as it has been invalidated +check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\""); + +# Turn hot_standby_feedback back on +change_hot_standby_feedback_and_wait_for_xmins(1,1); + +################################################## +# Recovery conflict: Invalidate conflicting slots, including in-use slots +# Scenario 2: conflict due to row removal with hot_standby_feedback off. +################################################## + +# get the position to search from in the standby logfile +my $logstart = -s $node_standby->logfile; + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +# One way to produce recovery conflict is to create/drop a relation and +# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby. +change_hot_standby_feedback_and_wait_for_xmins(0,1); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# This should trigger the conflict +$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]); +$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]); +$node_primary->safe_psql('testdb', 'VACUUM pg_class;'); + +$node_primary->wait_for_replay_catchup($node_standby); + +# message should be issued +ok( find_in_log( + $node_standby, + "invalidating replication slot \"inactiveslot\" because it conflicts with recovery", $logstart), + 'inactiveslot slot invalidation is logged with vacuum on pg_class'); + +ok( find_in_log( + $node_standby, + "invalidating replication slot \"activeslot\" because it conflicts with recovery", $logstart), + 'activeslot slot invalidation is logged with vacuum on pg_class'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated +# we now expect 2 conflicts reported as the counter persist across reloads +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as conflicting in pg_replication_slots +check_slots_conflicting_status(1); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); + +# We are not able to read from the slot as it has been invalidated +check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\""); + +################################################## +# Recovery conflict: Same as Scenario 2 but on a non catalog table +# Scenario 3: No conflict expected. +################################################## + +# get the position to search from in the standby logfile +$logstart = -s $node_standby->logfile; + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +# put hot standby feedback to off +change_hot_standby_feedback_and_wait_for_xmins(0,1); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# This should not trigger a conflict +$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]); +$node_primary->safe_psql('testdb', qq[INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]); +$node_primary->safe_psql('testdb', qq[UPDATE conflict_test set x=1, y=1;]); +$node_primary->safe_psql('testdb', 'VACUUM conflict_test;'); + +$node_primary->wait_for_replay_catchup($node_standby); + +# message should not be issued +ok( !find_in_log( + $node_standby, + "invalidating slot \"inactiveslot\" because it conflicts with recovery", $logstart), + 'inactiveslot slot invalidation is not logged with vacuum on conflict_test'); + +ok( !find_in_log( + $node_standby, + "invalidating slot \"activeslot\" because it conflicts with recovery", $logstart), + 'activeslot slot invalidation is not logged with vacuum on conflict_test'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated +# we now still expect 2 conflicts reported as the counter persist across reloads +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot not updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as non conflicting in pg_replication_slots +check_slots_conflicting_status(0); + +# Turn hot_standby_feedback back on +change_hot_standby_feedback_and_wait_for_xmins(1, 0); + +################################################## +# Recovery conflict: Invalidate conflicting slots, including in-use slots +# Scenario 4: conflict due to on-access pruning. +################################################## + +# get the position to search from in the standby logfile +$logstart = -s $node_standby->logfile; + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +# One way to produce recovery conflict is to trigger an on-access pruning +# on a relation marked as user_catalog_table. +change_hot_standby_feedback_and_wait_for_xmins(0,0); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# This should trigger the conflict +$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]); +$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]); +$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]); +$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]); +$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]); +$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]); + +$node_primary->wait_for_replay_catchup($node_standby); + +# message should be issued +ok( find_in_log( + $node_standby, + "invalidating replication slot \"inactiveslot\" because it conflicts with recovery", $logstart), + 'inactiveslot slot invalidation is logged with on-access pruning'); + +ok( find_in_log( + $node_standby, + "invalidating replication slot \"activeslot\" because it conflicts with recovery", $logstart), + 'activeslot slot invalidation is logged with on-access pruning'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated +# we now expect 3 conflicts reported as the counter persist across reloads +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as conflicting in pg_replication_slots +check_slots_conflicting_status(1); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); + +# We are not able to read from the slot as it has been invalidated +check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\""); + +# Turn hot_standby_feedback back on +change_hot_standby_feedback_and_wait_for_xmins(1, 1); + +################################################## +# Recovery conflict: Invalidate conflicting slots, including in-use slots +# Scenario 5: incorrect wal_level on primary. +################################################## + +# get the position to search from in the standby logfile +$logstart = -s $node_standby->logfile; + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# Make primary wal_level replica. This will trigger slot conflict. +$node_primary->append_conf('postgresql.conf',q[ +wal_level = 'replica' +]); +$node_primary->restart; + +$node_primary->wait_for_replay_catchup($node_standby); + +# message should be issued +ok( find_in_log( + $node_standby, + "invalidating replication slot \"inactiveslot\" because it conflicts with recovery", $logstart), + 'inactiveslot slot invalidation is logged due to wal_level'); + +ok( find_in_log( + $node_standby, + "invalidating replication slot \"activeslot\" because it conflicts with recovery", $logstart), + 'activeslot slot invalidation is logged due to wal_level'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated +# we now expect 3 conflicts reported as the counter persist across reloads +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 4) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as conflicting in pg_replication_slots +check_slots_conflicting_status(1); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); +# We are not able to read from the slot as it requires wal_level at least logical on the primary server +check_pg_recvlogical_stderr($handle, "logical decoding on standby requires wal_level to be at least logical on the primary server"); + +# Restore primary wal_level +$node_primary->append_conf('postgresql.conf',q[ +wal_level = 'logical' +]); +$node_primary->restart; +$node_primary->wait_for_replay_catchup($node_standby); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); +# as the slot has been invalidated we should not be able to read +check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\""); + +################################################## +# DROP DATABASE should drops it's slots, including active slots. +################################################## + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); +# Create a slot on a database that would not be dropped. This slot should not +# get dropped. +$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 'postgres'); + +# dropdb on the primary to verify slots are dropped on standby +$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_primary->wait_for_replay_catchup($node_standby); + +is($node_standby->safe_psql('postgres', + q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +check_slots_dropped($handle); + +is($node_standby->slot('otherslot')->{'slot_type'}, 'logical', + 'otherslot on standby not dropped'); + +# Cleanup : manually drop the slot that was not dropped. +$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]); + +################################################## +# Test standby promotion and logical decoding behavior +# after the standby gets promoted. +################################################## + +$node_standby->reload; + +$node_primary->psql('postgres', q[CREATE DATABASE testdb]); +$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]); + +# create the logical slots +create_logical_slots($node_standby); + +# create the logical slots on the cascading standby too +create_logical_slots($node_cascading_standby); + +# Make slots actives +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); +my $cascading_handle = make_slot_active($node_cascading_standby, 1, \$cascading_stdout, \$cascading_stderr); + +# Insert some rows before the promotion +$node_primary->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;] +); + +# Wait for both standbys to catchup +$node_primary->wait_for_replay_catchup($node_standby); +$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary); + +# promote +$node_standby->promote; + +# insert some rows on promoted standby +$node_standby->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;] +); + +# Wait for the cascading standby to catchup +$node_standby->wait_for_replay_catchup($node_cascading_standby); + +$expected = q{BEGIN +table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' +table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' +table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' +table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' +COMMIT +BEGIN +table public.decoding_test: INSERT: x[integer]:5 y[text]:'5' +table public.decoding_test: INSERT: x[integer]:6 y[text]:'6' +table public.decoding_test: INSERT: x[integer]:7 y[text]:'7' +COMMIT}; + +# check that we are decoding pre and post promotion inserted rows +$stdout_sql = $node_standby->safe_psql('testdb', + qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] +); + +is($stdout_sql, $expected, 'got expected output from SQL decoding session on promoted standby'); + +# check that we are decoding pre and post promotion inserted rows +# with pg_recvlogical that has started before the promotion +my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); + +ok( pump_until( + $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s), + 'got 2 COMMIT from pg_recvlogical output'); + +chomp($stdout); +is($stdout, $expected, + 'got same expected output from pg_recvlogical decoding session'); + +# check that we are decoding pre and post promotion inserted rows on the cascading standby +$stdout_sql = $node_cascading_standby->safe_psql('testdb', + qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] +); + +is($stdout_sql, $expected, 'got expected output from SQL decoding session on cascading standby'); + +# check that we are decoding pre and post promotion inserted rows +# with pg_recvlogical that has started before the promotion on the cascading standby +ok( pump_until( + $cascading_handle, $pump_timeout, \$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s), + 'got 2 COMMIT from pg_recvlogical output'); + +chomp($cascading_stdout); +is($cascading_stdout, $expected, + 'got same expected output from pg_recvlogical decoding session on cascading standby'); -- 2.34.1
From b811e76c4535c30417ac919ed352a023954c589e Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot <[email protected]> Date: Mon, 3 Apr 2023 16:46:09 +0000 Subject: [PATCH v60 4/6] For cascading replication, wake up physical walsenders separately from logical walsenders. Physical walsenders can't send data until it's been flushed; logical walsenders can't decode and send data until it's been applied. On the standby, the WAL is flushed first, which will only wake up physical walsenders; and then applied, which will only wake up logical walsenders. Previously, all walsenders were awakened when the WAL was flushed. That was fine for logical walsenders on the primary; but on the standby the flushed WAL would have been not applied yet, so logical walsenders were awakened too early. Author: Bertrand Drouvot per idea from Jeff Davis and Amit Kapila. Reviewed-By: Sawada Masahiko, Robert Haas. --- src/backend/access/transam/xlog.c | 6 ++-- src/backend/access/transam/xlogarchive.c | 2 +- src/backend/access/transam/xlogrecovery.c | 17 ++++++---- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 36 +++++++++++++++++---- src/include/replication/walsender.h | 22 ++++++------- src/include/replication/walsender_private.h | 3 ++ 7 files changed, 59 insertions(+), 29 deletions(-) 26.3% src/backend/access/transam/ 53.5% src/backend/replication/ 20.0% src/include/replication/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 779f5c3711..70ac8fc33b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * If we still haven't flushed to the request point then we have a @@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * Great, done. To take some work off the critical path, try to initialize @@ -5773,7 +5773,7 @@ StartupXLOG(void) * If there were cascading standby servers connected to us, nudge any wal * sender processes to notice that we've been promoted. */ - WalSndWakeup(); + WalSndWakeup(true, true); /* * If this was a promotion, request an (online) checkpoint now. This isn't diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index a0f5aa24b5..f3fb92c8f9 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) * if we restored something other than a WAL segment, but it does no harm * either. */ - WalSndWakeup(); + WalSndWakeup(true, false); } /* diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..df814e6ff7 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1935,6 +1935,17 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * Wakeup walsenders: + * + * - physical walsenders in case of new time line and cascade + * replication is allowed. + * - logical walsenders in case of new time line or recovery is in progress + * (logical decoding on standby). + */ + WalSndWakeup(switchedTLI && AllowCascadeReplication(), + switchedTLI || RecoveryInProgress()); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -1958,12 +1969,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl */ RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI); - /* - * Wake up any walsenders to notice that we are on a new timeline. - */ - if (AllowCascadeReplication()) - WalSndWakeup(); - /* Reset the prefetcher. */ XLogPrefetchReconfigure(); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 685af51d5d..d2aa93734c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) - WalSndWakeup(); + WalSndWakeup(true, !RecoveryInProgress()); /* Report XLOG streaming progress in PS display */ if (update_process_title) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index aeb5f93514..aaa78eed52 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2626,6 +2626,23 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + + /* + * The kind assignment is done here and not in StartReplication() + * and StartLogicalReplication(). Indeed, the logical walsender + * needs to read WAL records (like snapshot of running + * transactions) during the slot creation. So it needs to be woken + * up based on its kind. + * + * The kind assignment could also be done in StartReplication(), + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it + * seems better to set it on one place. + */ + if (MyDatabaseId == InvalidOid) + walsnd->kind = REPLICATION_KIND_PHYSICAL; + else + walsnd->kind = REPLICATION_KIND_LOGICAL; + SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3308,30 +3325,35 @@ WalSndShmemInit(void) } /* - * Wake up all walsenders + * Wake up physical, logical or both walsenders kind + * + * The distinction between physical and logical walsenders is done. This is due + * to the fact that for cascading replication we need to wake up physical + * walsenders separately from logical walsenders (see the comment before calling + * WalSndWakeup() in ApplyWalRecord() for more details). * * This will be called inside critical sections, so throwing an error is not * advisable. */ void -WalSndWakeup(void) +WalSndWakeup(bool physical, bool logical) { int i; for (i = 0; i < max_wal_senders; i++) { Latch *latch; + ReplicationKind kind; WalSnd *walsnd = &WalSndCtl->walsnds[i]; - /* - * Get latch pointer with spinlock held, for the unlikely case that - * pointer reads aren't atomic (as they're 8 bytes). - */ + /* get latch pointer and kind with spinlock helds */ SpinLockAcquire(&walsnd->mutex); latch = walsnd->latch; + kind = walsnd->kind; SpinLockRelease(&walsnd->mutex); - if (latch != NULL) + if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) || + (logical && kind == REPLICATION_KIND_LOGICAL))) SetLatch(latch); } } diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..9df7e50f94 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); -extern void WalSndWakeup(void); +extern void WalSndWakeup(bool physical, bool logical); extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); @@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void); /* * wakeup walsenders if there is work to be done */ -#define WalSndWakeupProcessRequests() \ - do \ - { \ - if (wake_wal_senders) \ - { \ - wake_wal_senders = false; \ - if (max_wal_senders > 0) \ - WalSndWakeup(); \ - } \ - } while (0) +static inline void +WalSndWakeupProcessRequests(bool physical, bool logical) +{ + if (wake_wal_senders) + { + wake_wal_senders = false; + if (max_wal_senders > 0) + WalSndWakeup(physical, logical); + } +} #endif /* _WALSENDER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5310e054c4..ff25aa70a8 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "lib/ilist.h" #include "nodes/nodes.h" +#include "nodes/replnodes.h" #include "replication/syncrep.h" #include "storage/latch.h" #include "storage/shmem.h" @@ -79,6 +80,8 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + ReplicationKind kind; } WalSnd; extern PGDLLIMPORT WalSnd *MyWalSnd; -- 2.34.1
From 5f0d4a60eb9fad8bc4f2fd2c8c168c2d822bece3 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot <[email protected]> Date: Mon, 3 Apr 2023 12:45:20 +0000 Subject: [PATCH v60 3/6] Allow logical decoding on standby. Allow a logical slot to be created on standby. Restrict its usage or its creation if wal_level on primary is less than logical. During slot creation, it's restart_lsn is set to the last replayed LSN. Effectively, a logical slot creation on standby waits for an xl_running_xact record to arrive from primary. Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello --- src/backend/access/transam/xlog.c | 11 +++++ src/backend/replication/logical/decode.c | 22 ++++++++- src/backend/replication/logical/logical.c | 36 +++++++------- src/backend/replication/slot.c | 58 ++++++++++++----------- src/backend/replication/walsender.c | 46 +++++++++++------- src/include/access/xlog.h | 1 + 6 files changed, 113 insertions(+), 61 deletions(-) 4.6% src/backend/access/transam/ 37.7% src/backend/replication/logical/ 56.7% src/backend/replication/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 10085aa0d6..779f5c3711 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset) ReadControlFile(); } +/* + * Get the wal_level from the control file. For a standby, this value should be + * considered as its active wal_level, because it may be different from what + * was originally configured on standby. + */ +WalLevel +GetActiveWalLevelOnStandby(void) +{ + return ControlFile->wal_level; +} + /* * Initialization of shared memory for XLOG */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8fe7bb65f1..b37b91bbe0 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; + case XLOG_PARAMETER_CHANGE: + { + xl_parameter_change *xlrec = + (xl_parameter_change *) XLogRecGetData(buf->record); + + /* + * If wal_level on primary is reduced to less than logical, + * then we want to prevent existing logical slots from being + * used. Existing logical slots on standby get invalidated + * when this WAL record is replayed; and further, slot + * creation fails when the wal level is not sufficient; but + * all these operations are not synchronized, so a logical + * slot may creep in while the wal_level is being reduced. + * Hence this extra check. + */ + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server"))); + break; + } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: - case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c3ec97a0a6..60a5008b6d 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /* + * This check may have race conditions, but whenever + * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we + * verify that there are no existing logical replication slots. And to + * avoid races around creating a new slot, + * CheckLogicalDecodingRequirements() is called once before creating + * the slot, and once when logical decoding is initially starting up. + */ + if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server"))); + } } /* @@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin, LogicalDecodingContext *ctx; MemoryContext old_context; + /* + * On standby, this check is also required while creating the slot. Check + * the comments in this function. + */ + CheckLogicalDecodingRequirements(); + /* shorter lines... */ slot = MyReplicationSlot; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index aacb75bebf..efae9588f3 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -41,6 +41,7 @@ #include "access/transam.h" #include "access/xlog_internal.h" +#include "access/xlogrecovery.h" #include "common/file_utils.h" #include "common/string.h" #include "miscadmin.h" @@ -1183,37 +1184,28 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. But on a standby we cannot do WAL writes, so just use the + * replay pointer; effectively, an attempt to create a logical slot on + * standby will cause it to wait for an xl_running_xact record to be + * logged independently on the primary, so that a snapshot can be + * built using the record. * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead + * return the location of the last redo LSN. While that slightly + * increases the chance that we have to retry, it's where a base + * backup has to start replay at. */ - if (!RecoveryInProgress() && SlotIsLogical(slot)) - { - XLogRecPtr flushptr; - - /* start at current insert position */ + if (SlotIsPhysical(slot)) + restart_lsn = GetRedoRecPtr(); + else if (RecoveryInProgress()) + restart_lsn = GetXLogReplayRecPtr(NULL); + else restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); - } - else - { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - } + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); @@ -1229,8 +1221,18 @@ ReplicationSlotReserveWal(void) if (XLogGetLastRemovedSegno() < segno) break; } -} + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } +} /* * Report terminating or conflicting message. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b686691ca2..aeb5f93514 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -906,23 +906,32 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req int count; WALReadError errinfo; XLogSegNo segno; - TimeLineID currTLI = GetWALInsertionTimeLine(); + TimeLineID currTLI; /* - * Since logical decoding is only permitted on a primary server, we know - * that the current timeline ID can't be changing any more. If we did this - * on a standby, we'd have to worry about the values we compute here - * becoming invalid due to a promotion or timeline change. + * Since logical decoding is also permitted on a standby server, we need + * to check if the server is in recovery to decide how to get the current + * timeline ID (so that it also cover the promotion or timeline change + * cases). */ + + /* make sure we have enough WAL available */ + flushptr = WalSndWaitForWal(targetPagePtr + reqLen); + + /* the standby could have been promoted, so check if still in recovery */ + am_cascading_walsender = RecoveryInProgress(); + + if (am_cascading_walsender) + GetXLogReplayRecPtr(&currTLI); + else + currTLI = GetWALInsertionTimeLine(); + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI); sendTimeLineIsHistoric = (state->currTLI != currTLI); sendTimeLine = state->currTLI; sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; - /* make sure we have enough WAL available */ - flushptr = WalSndWaitForWal(targetPagePtr + reqLen); - /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) return -1; @@ -937,9 +946,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req cur_page, targetPagePtr, XLOG_BLCKSZ, - state->seg.ws_tli, /* Pass the current TLI because only - * WalSndSegmentOpen controls whether new - * TLI is needed. */ + currTLI, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new TLI + * is needed. */ &errinfo)) WALReadRaiseError(&errinfo); @@ -3073,10 +3082,14 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(NULL); - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(NULL); + if (flushPtr == InvalidXLogRecPtr || + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + { + if (am_cascading_walsender) + flushPtr = GetStandbyFlushRecPtr(NULL); + else + flushPtr = GetFlushRecPtr(NULL); + } /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) @@ -3167,7 +3180,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli) receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); - *tli = replayTLI; + if (tli) + *tli = replayTLI; result = replayPtr; if (receiveTLI == replayTLI && receivePtr > replayPtr) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index cfe5409738..48ca852381 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -230,6 +230,7 @@ extern void XLOGShmemInit(void); extern void BootStrapXLOG(void); extern void InitializeWalConsistencyChecking(void); extern void LocalProcessControlFile(bool reset); +extern WalLevel GetActiveWalLevelOnStandby(void); extern void StartupXLOG(void); extern void ShutdownXLOG(int code, Datum arg); extern void CreateCheckPoint(int flags); -- 2.34.1
From abbcd53b92c22205771d36996a1eda6f02d46ee4 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot <[email protected]> Date: Mon, 3 Apr 2023 11:28:30 +0000 Subject: [PATCH v60 2/6] Arrange for a new pg_stat_database_conflicts and pg_replication_slots field As we handled logical slot conflicts on standby on the previous commit, we can expose the conflict in pg_stat_database_conflicts and pg_replication_slots. Adding: - confl_active_logicalslot in pg_stat_database_conflicts - conflicting in pg_replication_slots to do so. --- doc/src/sgml/monitoring.sgml | 11 +++++++++++ doc/src/sgml/system-views.sgml | 10 ++++++++++ src/backend/catalog/system_views.sql | 6 ++++-- src/backend/replication/slotfuncs.c | 12 +++++++++++- src/backend/utils/activity/pgstat_database.c | 4 ++++ src/backend/utils/adt/pgstatfuncs.c | 3 +++ src/include/catalog/pg_proc.dat | 11 ++++++++--- src/include/pgstat.h | 1 + src/test/regress/expected/rules.out | 8 +++++--- 9 files changed, 57 insertions(+), 9 deletions(-) 33.7% doc/src/sgml/ 8.1% src/backend/catalog/ 13.1% src/backend/replication/ 5.9% src/backend/utils/activity/ 5.6% src/backend/utils/adt/ 24.6% src/include/catalog/ 6.9% src/test/regress/expected/ diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index fd0ffbb1e0..9fc585b9e7 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -4674,6 +4674,17 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i deadlocks </para></entry> </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>confl_active_logicalslot</structfield> <type>bigint</type> + </para> + <para> + Number of active logical slots in this database that have been + invalidated because they conflict with recovery (note that inactive ones + are also invalidated but do not increment this counter) + </para></entry> + </row> </tbody> </tgroup> </table> diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index bb1a418450..57b228076e 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx false for physical slots. </para></entry> </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>conflicting</structfield> <type>bool</type> + </para> + <para> + True if this logical slot conflicted with recovery (and so is now + invalidated). Always NULL for physical slots. + </para></entry> + </row> </tbody> </tgroup> </table> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6b098234f8..c25067d06d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -999,7 +999,8 @@ CREATE VIEW pg_replication_slots AS L.confirmed_flush_lsn, L.wal_status, L.safe_wal_size, - L.two_phase + L.two_phase, + L.conflicting FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); @@ -1067,7 +1068,8 @@ CREATE VIEW pg_stat_database_conflicts AS pg_stat_get_db_conflict_lock(D.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin, - pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock + pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock, + pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_active_logicalslot FROM pg_database D; CREATE VIEW pg_stat_user_functions AS diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 015d276fd9..6473c73eca 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 14 +#define PG_GET_REPLICATION_SLOTS_COLS 15 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -403,6 +403,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + if (slot_contents.data.database == InvalidOid) + nulls[i++] = true; + else + { + if (LogicalReplicationSlotIsInvalid(slot)) + values[i++] = BoolGetDatum(true); + else + values[i++] = BoolGetDatum(false); + } + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/utils/activity/pgstat_database.c b/src/backend/utils/activity/pgstat_database.c index 6e650ceaad..7149f22f72 100644 --- a/src/backend/utils/activity/pgstat_database.c +++ b/src/backend/utils/activity/pgstat_database.c @@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason) case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: dbentry->conflict_bufferpin++; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + dbentry->conflict_logicalslot++; + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: dbentry->conflict_startup_deadlock++; break; @@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) PGSTAT_ACCUM_DBCOUNT(conflict_tablespace); PGSTAT_ACCUM_DBCOUNT(conflict_lock); PGSTAT_ACCUM_DBCOUNT(conflict_snapshot); + PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot); PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin); PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index eec9f3cf9b..4de60d8aa1 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1066,6 +1066,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit) /* pg_stat_get_db_xact_rollback */ PG_STAT_GET_DBENTRY_INT64(xact_rollback) +/* pg_stat_get_db_conflict_logicalslot */ +PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot) Datum pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS) @@ -1099,6 +1101,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS) result = (int64) (dbentry->conflict_tablespace + dbentry->conflict_lock + dbentry->conflict_snapshot + + dbentry->conflict_logicalslot + dbentry->conflict_bufferpin + dbentry->conflict_startup_deadlock); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f9f2642201..bcbae9036d 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5605,6 +5605,11 @@ proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's', proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', prosrc => 'pg_stat_get_db_conflict_snapshot' }, +{ oid => '9901', + descr => 'statistics: recovery conflicts in database caused by logical replication slot', + proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's', + proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', + prosrc => 'pg_stat_get_db_conflict_logicalslot' }, { oid => '3068', descr => 'statistics: recovery conflicts in database caused by shared buffer pin', proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's', @@ -11071,9 +11076,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 75d258d921..fa3d326d86 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -331,6 +331,7 @@ typedef struct PgStat_StatDBEntry PgStat_Counter conflict_tablespace; PgStat_Counter conflict_lock; PgStat_Counter conflict_snapshot; + PgStat_Counter conflict_logicalslot; PgStat_Counter conflict_bufferpin; PgStat_Counter conflict_startup_deadlock; PgStat_Counter temp_files; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index ab1aebfde4..06d3f1f5d3 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name, l.confirmed_flush_lsn, l.wal_status, l.safe_wal_size, - l.two_phase - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase) + l.two_phase, + l.conflicting + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, @@ -1869,7 +1870,8 @@ pg_stat_database_conflicts| SELECT oid AS datid, pg_stat_get_db_conflict_lock(oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin, - pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock + pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock, + pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot FROM pg_database d; pg_stat_gssapi| SELECT pid, gss_auth AS gss_authenticated, -- 2.34.1
From 6879e222dcf35ddf802caa78d0f1f7d118511c61 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS <[email protected]> Date: Tue, 7 Feb 2023 08:57:56 +0000 Subject: [PATCH v60 1/6] Handle logical slot conflicts on standby. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During WAL replay on standby, when slot conflict is identified, invalidate such slots. Also do the same thing if wal_level on the primary server is reduced to below logical and there are existing logical slots on standby. Introduce a new ProcSignalReason value for slot conflict recovery. Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello, Bharath Rupireddy, Amit Kapila, Álvaro Herrera --- src/backend/access/gist/gistxlog.c | 2 + src/backend/access/hash/hash_xlog.c | 1 + src/backend/access/heap/heapam.c | 3 + src/backend/access/nbtree/nbtxlog.c | 2 + src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 21 +- .../replication/logical/logicalfuncs.c | 13 +- src/backend/replication/slot.c | 189 ++++++++++++++---- src/backend/replication/slotfuncs.c | 3 +- src/backend/replication/walsender.c | 7 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 13 +- src/backend/tcop/postgres.c | 28 +++ src/include/replication/slot.h | 55 ++++- src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 2 + 16 files changed, 293 insertions(+), 51 deletions(-) 7.0% src/backend/access/transam/ 5.3% src/backend/replication/logical/ 58.1% src/backend/replication/ 5.0% src/backend/storage/ipc/ 7.8% src/backend/tcop/ 13.0% src/include/replication/ 3.3% src/ diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index b7678f3c14..9a86fb3fef 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, rlocator); } @@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, xlrec->locator); } diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index f2dd9be8d3..e8e06c62a9 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, rlocator); } diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index f7d9ce59a4..371e855683 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8717,6 +8717,7 @@ heap_xlog_prune(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); /* @@ -8888,6 +8889,7 @@ heap_xlog_visible(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL, rlocator); /* @@ -9009,6 +9011,7 @@ heap_xlog_freeze_page(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); } diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 414ca4f6de..c87e46ed66 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); } @@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, xlrec->locator); } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index b071b59c8a..459ac929ba 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &locator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, locator); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 46821ad605..10085aa0d6 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6806,7 +6806,7 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(_logSegNo)) + if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, NULL)) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7250,7 +7250,7 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(_logSegNo)) + if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, NULL)) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Invalidate logical slots if we are in hot standby and the primary + * does not have a WAL level sufficient for logical decoding. No need + * to search for potentially conflicting logically slots if standby is + * running with wal_level lower than logical, because in that case, we + * would have either disallowed creation of logical slots or + * invalidated existing ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + { + TransactionId ConflictHorizon = InvalidTransactionId; + + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, InvalidOid, &ConflictHorizon); + } + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fa1b641a2b..575a047e53 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -216,9 +216,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* * After the sanity checks in CreateDecodingContext, make sure the - * restart_lsn is valid. Avoid "cannot get changes" wording in this - * errmsg because that'd be confusingly ambiguous about no changes - * being available. + * restart_lsn is valid or both effective_xmin and catalog_xmin are + * valid. Avoid "cannot get changes" wording in this errmsg because + * that'd be confusingly ambiguous about no changes being available. */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, @@ -227,6 +227,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(*name)), errdetail("This slot has never previously reserved WAL, or it has been invalidated."))); + if (LogicalReplicationSlotIsInvalid(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot read from logical replication slot \"%s\"", + NameStr(*name)), + errdetail("This slot has been invalidated because it was conflicting with recovery."))); + MemoryContextSwitchTo(oldcontext); /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2293c0c6fc..aacb75bebf 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -110,6 +110,13 @@ static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel); +/* to report termination/invalidation */ +static void ReportTerminationInvalidation(bool terminating, bool check_on_xid, + int pid, NameData slotname, + TransactionId *xid, + XLogRecPtr restart_lsn, + XLogRecPtr oldestLSN); + /* * Report shared-memory space needed by ReplicationSlotsShmemInit. */ @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) SpinLockAcquire(&s->mutex); effective_xmin = s->effective_xmin; effective_catalog_xmin = s->effective_catalog_xmin; - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && - XLogRecPtrIsInvalid(s->data.restart_lsn)); + invalidated = ObsoleteSlotIsInvalid(s, true) || LogicalReplicationSlotIsInvalid(s); SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ @@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void) } } + +/* + * Report terminating or conflicting message. + * + * For both, logical conflict on standby and obsolete slot are handled. + */ +static void +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid, + NameData slotname, TransactionId *xid, + XLogRecPtr restart_lsn, XLogRecPtr oldestLSN) +{ + StringInfoData err_msg; + StringInfoData err_detail; + bool hint = false; + + initStringInfo(&err_detail); + + if (check_on_xid) + { + if (!terminating) + { + initStringInfo(&err_msg); + appendStringInfo(&err_msg, _("invalidating replication slot \"%s\" because it conflicts with recovery"), + NameStr(slotname)); + } + + if (TransactionIdIsValid(*xid)) + appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."), *xid); + else + appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server")); + } + else + { + if (!terminating) + { + initStringInfo(&err_msg); + appendStringInfo(&err_msg, _("invalidating obsolete replication slot \"%s\""), + NameStr(slotname)); + } + + appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."), + LSN_FORMAT_ARGS(restart_lsn), + (unsigned long long) (oldestLSN - restart_lsn)); + + hint = true; + } + + ereport(LOG, + terminating ? errmsg("terminating process %d to release replication slot \"%s\"", pid, NameStr(slotname)) : + errmsg_internal("%s", err_msg.data), + errdetail_internal("%s", err_detail.data), + hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0); + + if (!terminating) + pfree(err_msg.data); + + pfree(err_detail.data); +} + /* - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot - * and mark it invalid, if necessary and possible. + * Helper for InvalidateObsoleteReplicationSlots + * + * Acquires the given slot and mark it invalid, if necessary and possible. * * Returns whether ReplicationSlotControlLock was released in the interim (and * in that case we're not holding the lock at return, otherwise we are). * - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.) * * This is inherently racy, because we release the LWLock * for syscalls, so caller must restart if we return true. */ static bool InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, - bool *invalidated) + bool *invalidated, TransactionId *xid) { int last_signaled_pid = 0; bool released_lock = false; + bool check_on_xid; + + check_on_xid = xid ? true : false; for (;;) { XLogRecPtr restart_lsn; + NameData slotname; int active_pid = 0; @@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, * Check if the slot needs to be invalidated. If it needs to be * invalidated, and is not currently acquired, acquire it and mark it * as having been invalidated. We do this with the spinlock held to - * avoid race conditions -- for example the restart_lsn could move - * forward, or the slot could be dropped. + * avoid race conditions -- for example the restart_lsn (or the + * xmin(s) could) move forward or the slot could be dropped. */ SpinLockAcquire(&s->mutex); restart_lsn = s->data.restart_lsn; /* - * If the slot is already invalid or is fresh enough, we don't need to - * do anything. + * If the slot is already invalid or is a non conflicting slot, we + * don't need to do anything. */ - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) + if (DoNotInvalidateSlot(s, xid, &oldestLSN)) { + /* then, we are not forcing for invalidation */ SpinLockRelease(&s->mutex); if (released_lock) LWLockRelease(ReplicationSlotControlLock); @@ -1294,9 +1365,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, { MyReplicationSlot = s; s->active_pid = MyProcPid; - s->data.invalidated_at = restart_lsn; - s->data.restart_lsn = InvalidXLogRecPtr; - + if (xid) + { + s->effective_xmin = InvalidTransactionId; + s->data.catalog_xmin = InvalidTransactionId; + } + else + { + s->data.invalidated_at = restart_lsn; + s->data.restart_lsn = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; } @@ -1329,15 +1407,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, */ if (last_signaled_pid != active_pid) { - ereport(LOG, - errmsg("terminating process %d to release replication slot \"%s\"", - active_pid, NameStr(slotname)), - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", - LSN_FORMAT_ARGS(restart_lsn), - (unsigned long long) (oldestLSN - restart_lsn)), - errhint("You might need to increase max_slot_wal_keep_size.")); - - (void) kill(active_pid, SIGTERM); + ReportTerminationInvalidation(true, check_on_xid, active_pid, + slotname, xid, restart_lsn, + oldestLSN); + + if (check_on_xid) + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId); + else + (void) kill(active_pid, SIGTERM); + last_signaled_pid = active_pid; } @@ -1370,14 +1448,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, ReplicationSlotMarkDirty(); ReplicationSlotSave(); ReplicationSlotRelease(); + pgstat_drop_replslot(s); - ereport(LOG, - errmsg("invalidating obsolete replication slot \"%s\"", - NameStr(slotname)), - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", - LSN_FORMAT_ARGS(restart_lsn), - (unsigned long long) (oldestLSN - restart_lsn)), - errhint("You might need to increase max_slot_wal_keep_size.")); + ReportTerminationInvalidation(false, check_on_xid, active_pid, + slotname, xid, restart_lsn, + oldestLSN); /* done with this slot for now */ break; @@ -1390,20 +1465,36 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, } /* - * Mark any slot that points to an LSN older than the given segment - * as invalid; it requires WAL that's about to be removed. + * Invalidate Obsolete slots or resolve recovery conflicts with logical slots. * - * Returns true when any slot have got invalidated. + * Obsolete case (aka xid is NULL): * - * NB - this runs as part of checkpoint, so avoid raising errors if possible. + * Mark any slot that points to an LSN older than the given segment + * as invalid; it requires WAL that's about to be removed. + * invalidated is set to true when any slot have got invalidated. + * + * Logical replication slot case: + * + * When xid is valid, it means that we are about to remove rows older than xid. + * Therefore we need to invalidate slots that depend on seeing those rows. + * When xid is invalid, invalidate all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be invalidated. */ bool -InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid, TransactionId *xid) { - XLogRecPtr oldestLSN; + + XLogRecPtr oldestLSN = InvalidXLogRecPtr; bool invalidated = false; - XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + Assert(max_replication_slots >= 0); + + if (max_replication_slots == 0) + return invalidated; + + if (!xid) + XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); restart: LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); @@ -1414,21 +1505,35 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) + if (xid) { - /* if the lock was released, start from scratch */ - goto restart; + /* we are only dealing with *logical* slot conflicts */ + if (!SlotIsLogical(s)) + continue; + + /* + * not the database of interest and we don't want all the + * database, skip + */ + if (s->data.database != dboid && TransactionIdIsValid(*xid)) + continue; } + + if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated, xid)) + goto restart; } + LWLockRelease(ReplicationSlotControlLock); /* - * If any slots have been invalidated, recalculate the resource limits. + * If any slots have been invalidated, recalculate the required xmin and + * the required lsn (if appropriate). */ if (invalidated) { ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + if (!xid) + ReplicationSlotsComputeRequiredLSN(); } return invalidated; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2f3c964824..015d276fd9 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -319,8 +319,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) * certain that the slot has been invalidated. Otherwise, test * availability from restart_lsn. */ - if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) && - !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at)) + if (ObsoleteSlotIsInvalid(slot, true)) walstate = WALAVAIL_REMOVED; else walstate = GetWALAvailability(slot_contents.data.restart_lsn); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 75e8363e24..b686691ca2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1253,6 +1253,13 @@ StartLogicalReplication(StartReplicationCmd *cmd) ReplicationSlotAcquire(cmd->slotname, true); + if (LogicalReplicationSlotIsInvalid(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot read from logical replication slot \"%s\"", + cmd->slotname), + errdetail("This slot has been invalidated because it was conflicting with recovery."))); + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 395b2cf690..c85cb5cc18 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 9f56b4e95c..c62245afc7 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,6 +24,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, */ void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator) { VirtualTransactionId *backends; @@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, true); + + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, locator.dbOid, &snapshotConflictHorizon); } /* @@ -499,6 +504,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, */ void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator) { /* @@ -517,7 +523,9 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor TransactionId truncated; truncated = XidFromFullTransactionId(snapshotConflictHorizon); - ResolveRecoveryConflictWithSnapshot(truncated, locator); + ResolveRecoveryConflictWithSnapshot(truncated, + isCatalogRel, + locator); } } @@ -1478,6 +1486,9 @@ get_recovery_conflict_desc(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: reasonDesc = _("recovery conflict on snapshot"); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + reasonDesc = _("recovery conflict on replication slot"); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: reasonDesc = _("recovery conflict on buffer deadlock"); break; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index a10ecbaf50..fb2de8542d 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -3137,6 +3140,31 @@ RecoveryConflictInterrupt(ProcSignalReason reason) /* Intentional fall through to session cancel */ /* FALLTHROUGH */ + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + + /* + * For conflicts that require a logical slot to be + * invalidated, the requirement is for the signal receiver to + * release the slot, so that it could be invalidated by the + * signal sender. So for normal backends, the transaction + * should be aborted, just like for other recovery conflicts. + * But if it's walsender on standby, we don't want to go + * through the following IsTransactionOrTransactionBlock() + * check, so break here. + */ + if (am_cascading_walsender && + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) + { + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + break; + } + + /* Intentional fall through to session cancel */ + /* FALLTHROUGH */ + case PROCSIG_RECOVERY_CONFLICT_DATABASE: RecoveryConflictPending = true; ProcDiePending = true; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8872c80cdf..914b6aebc3 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -168,6 +168,58 @@ typedef struct ReplicationSlot #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid) +static inline bool +ObsoleteSlotIsInvalid(ReplicationSlot *s, bool check_invalidated_at) +{ + if (check_invalidated_at) + return (!XLogRecPtrIsInvalid(s->data.invalidated_at) && + XLogRecPtrIsInvalid(s->data.restart_lsn)); + else + return (XLogRecPtrIsInvalid(s->data.restart_lsn)); +} + +static inline bool +LogicalReplicationSlotIsInvalid(ReplicationSlot *s) +{ + return (!TransactionIdIsValid(s->effective_xmin) && + !TransactionIdIsValid(s->data.catalog_xmin)); +} + +static inline bool +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid) +{ + TransactionId slot_effective_xmin; + TransactionId slot_catalog_xmin; + + slot_effective_xmin = s->effective_xmin; + slot_catalog_xmin = s->data.catalog_xmin; + + return (((TransactionIdIsValid(slot_effective_xmin) && TransactionIdPrecedesOrEquals(slot_effective_xmin, xid)) || + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)))); +} + +static inline bool +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN) +{ + return (s->data.restart_lsn >= oldestLSN); +} + +static inline bool +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid) +{ + return (TransactionIdIsValid(*xid) && !LogicalReplicationSlotXidsConflict(s, *xid)); +} + +static inline bool +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr *oldestLSN) +{ + if (xid) + return (LogicalReplicationSlotIsInvalid(s) || LogicalSlotIsNotConflicting(s, xid)); + else + return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, *oldestLSN)); + +} + /* * Shared memory control area for all of replication slots. */ @@ -215,7 +267,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); -extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid, TransactionId *xid); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); @@ -227,5 +279,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason); #endif /* SLOT_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 905af2231b..2f52100b00 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -42,6 +42,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 2effdea126..41f4dc372e 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator); extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); -- 2.34.1
