Hi,
On 12/16/22 6:24 PM, Drouvot, Bertrand wrote:
Hi,
On 12/16/22 5:38 PM, Robert Haas wrote:
On Fri, Dec 16, 2022 at 10:08 AM Drouvot, Bertrand
<bertranddrouvot...@gmail.com> wrote:
After 1489b1ce728 the name mayConflictInLogicalDecoding seems odd. Seems
it should be a riff on snapshotConflictHorizon?
Gotcha, what about logicalSnapshotConflictThreat?
logicalConflictPossible? checkDecodingConflict?
I think we should try to keep this to three words if we can. There's
not likely to be enough value in a fourth word to make up for the
downside of being more verbose.
Yeah agree, I'd vote for logicalConflictPossible then.
Please find attached v33 using logicalConflictPossible as the new field name
instead of mayConflictInLogicalDecoding.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 68890d7a0edcd997c0daab6e375a779656367797 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 08:38:56 +0000
Subject: [PATCH v33 6/6] Fixing Walsender corner case with logical decoding on
standby.
The problem is that WalSndWaitForWal() waits for the *replay* LSN to
increase, but gets woken up by walreceiver when new WAL has been
flushed. Which means that typically walsenders will get woken up at the
same time that the startup process will be - which means that by the
time the logical walsender checks GetXLogReplayRecPtr() it's unlikely
that the startup process already replayed the record and updated
XLogCtl->lastReplayedEndRecPtr.
Introducing a new condition variable to fix this corner case.
---
src/backend/access/transam/xlogrecovery.c | 28 ++++++++++++++++++++
src/backend/replication/walsender.c | 31 +++++++++++++++++------
src/backend/utils/activity/wait_event.c | 3 +++
src/include/access/xlogrecovery.h | 3 +++
src/include/replication/walsender.h | 1 +
src/include/utils/wait_event.h | 1 +
6 files changed, 59 insertions(+), 8 deletions(-)
41.2% src/backend/access/transam/
48.5% src/backend/replication/
3.6% src/backend/utils/activity/
3.4% src/include/access/
diff --git a/src/backend/access/transam/xlogrecovery.c
b/src/backend/access/transam/xlogrecovery.c
index d5a81f9d83..ac8b169ab5 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData
RecoveryPauseState recoveryPauseState;
ConditionVariable recoveryNotPausedCV;
+ /* Replay state (see getReplayedCV() for more explanation) */
+ ConditionVariable replayedCV;
+
slock_t info_lck; /* locks shared variables shown
above */
} XLogRecoveryCtlData;
@@ -467,6 +470,7 @@ XLogRecoveryShmemInit(void)
SpinLockInit(&XLogRecoveryCtl->info_lck);
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
+ ConditionVariableInit(&XLogRecoveryCtl->replayedCV);
}
/*
@@ -1916,6 +1920,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord
*record, TimeLineID *repl
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ /*
+ * wake up walsender(s) used by logical decoding on standby.
+ */
+ ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV);
+
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
* receiver so that it notices the updated lastReplayedEndRecPtr and
sends
@@ -4916,3 +4925,22 @@ assign_recovery_target_xid(const char *newval, void
*extra)
else
recoveryTarget = RECOVERY_TARGET_UNSET;
}
+
+/*
+ * Return the ConditionVariable indicating that a replay has been done.
+ *
+ * This is needed for logical decoding on standby. Indeed the "problem" is that
+ * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up
+ * by walreceiver when new WAL has been flushed. Which means that typically
+ * walsenders will get woken up at the same time that the startup process
+ * will be - which means that by the time the logical walsender checks
+ * GetXLogReplayRecPtr() it's unlikely that the startup process already
replayed
+ * the record and updated XLogCtl->lastReplayedEndRecPtr.
+ *
+ * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case.
+ */
+ConditionVariable *
+getReplayedCV(void)
+{
+ return &XLogRecoveryCtl->replayedCV;
+}
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index 9662e316c9..8c8dbe812f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1548,6 +1548,7 @@ WalSndWaitForWal(XLogRecPtr loc)
{
int wakeEvents;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+ ConditionVariable *replayedCV = getReplayedCV();
/*
* Fast path to avoid acquiring the spinlock in case we already know we
@@ -1566,7 +1567,6 @@ WalSndWaitForWal(XLogRecPtr loc)
for (;;)
{
- long sleeptime;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
@@ -1650,20 +1650,35 @@ WalSndWaitForWal(XLogRecPtr loc)
WalSndKeepaliveIfNecessary();
/*
- * Sleep until something happens or we time out. Also wait for
the
- * socket becoming writable, if there's still pending output.
+ * When not in recovery, sleep until something happens or we
time out.
+ * Also wait for the socket becoming writable, if there's still
pending output.
* Otherwise we might sit on sendable output data while waiting
for
* new WAL to be generated. (But if we have nothing to send,
we don't
* want to wake on socket-writable.)
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ if (!RecoveryInProgress())
+ {
+ long sleeptime;
+ sleeptime =
WalSndComputeSleeptime(GetCurrentTimestamp());
- wakeEvents = WL_SOCKET_READABLE;
+ wakeEvents = WL_SOCKET_READABLE;
- if (pq_is_send_pending())
- wakeEvents |= WL_SOCKET_WRITEABLE;
+ if (pq_is_send_pending())
+ wakeEvents |= WL_SOCKET_WRITEABLE;
- WalSndWait(wakeEvents, sleeptime,
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+ WalSndWait(wakeEvents, sleeptime * 10,
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+ }
+ else
+ /*
+ * We are in the logical decoding on standby case.
+ * We are waiting for the startup process to replay wal
record(s) using
+ * a timeout in case we are requested to stop.
+ */
+ {
+ ConditionVariablePrepareToSleep(replayedCV);
+ ConditionVariableTimedSleep(replayedCV, 1000,
+
WAIT_EVENT_WAL_SENDER_WAIT_REPLAY);
+ }
}
/* reactivate latch so WalSndLoop knows to continue */
diff --git a/src/backend/utils/activity/wait_event.c
b/src/backend/utils/activity/wait_event.c
index b2abd75ddb..3f6059805a 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -457,6 +457,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
event_name = "WalReceiverWaitStart";
break;
+ case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY:
+ event_name = "WalReceiverWaitReplay";
+ break;
case WAIT_EVENT_XACT_GROUP_UPDATE:
event_name = "XactGroupUpdate";
break;
diff --git a/src/include/access/xlogrecovery.h
b/src/include/access/xlogrecovery.h
index f3398425d8..0afd57ecac 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -15,6 +15,7 @@
#include "catalog/pg_control.h"
#include "lib/stringinfo.h"
#include "utils/timestamp.h"
+#include "storage/condition_variable.h"
/*
* Recovery target type.
@@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char
*param_name, int currValue,
extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
+extern ConditionVariable *getReplayedCV(void);
+
#endif /* XLOGRECOVERY_H */
diff --git a/src/include/replication/walsender.h
b/src/include/replication/walsender.h
index 8336a6e719..550ef3107f 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
#define _WALSENDER_H
#include <signal.h>
+#include "storage/condition_variable.h"
/*
* What to do with a snapshot in create replication slot command.
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 0b2100be4a..30c2cf35ae 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -128,6 +128,7 @@ typedef enum
WAIT_EVENT_SYNC_REP,
WAIT_EVENT_WAL_RECEIVER_EXIT,
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+ WAIT_EVENT_WAL_SENDER_WAIT_REPLAY,
WAIT_EVENT_XACT_GROUP_UPDATE
} WaitEventIPC;
--
2.34.1
From 877162c72b96bc57e3253e9d1156ccc63f3f605b Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 08:38:10 +0000
Subject: [PATCH v33 5/6] Doc changes describing details about logical
decoding.
Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes
Mello
---
doc/src/sgml/logicaldecoding.sgml | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
100.0% doc/src/sgml/
diff --git a/doc/src/sgml/logicaldecoding.sgml
b/doc/src/sgml/logicaldecoding.sgml
index 38ee69dccc..9acf16037a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,27 @@ postgres=# select * from
pg_logical_slot_get_changes('regression_slot', NULL, NU
may consume changes from a slot at any given time.
</para>
+ <para>
+ A logical replication slot can also be created on a hot standby. To
prevent
+ <command>VACUUM</command> from removing required rows from the system
+ catalogs, <varname>hot_standby_feedback</varname> should be set on the
+ standby. In spite of that, if any required rows get removed, the slot gets
+ invalidated. It's highly recommended to use a physical slot between the
primary
+ and the standby. Otherwise, hot_standby_feedback will work, but only
while the
+ connection is alive (for example a node restart would break it). Existing
+ logical slots on standby also get invalidated if wal_level on primary is
reduced to
+ less than 'logical'.
+ </para>
+
+ <para>
+ For a logical slot to be created, it builds a historic snapshot, for which
+ information of all the currently running transactions is essential. On
+ primary, this information is available, but on standby, this information
+ has to be obtained from primary. So, slot creation may wait for some
+ activity to happen on the primary. If the primary is idle, creating a
+ logical slot on standby may take a noticeable time.
+ </para>
+
<caution>
<para>
Replication slots persist across crashes and know nothing about the state
--
2.34.1
From 9a3ab2bc33dfeea615b338ea13ec9c926971574d Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 08:37:31 +0000
Subject: [PATCH v33 4/6] New TAP test for logical decoding on standby.
Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes
Mello
---
src/test/perl/PostgreSQL/Test/Cluster.pm | 37 ++
src/test/recovery/meson.build | 1 +
.../t/034_standby_logical_decoding.pl | 479 ++++++++++++++++++
3 files changed, 517 insertions(+)
6.0% src/test/perl/PostgreSQL/Test/
93.7% src/test/recovery/t/
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 7411188dc8..171dc85388 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3037,6 +3037,43 @@ $SIG{TERM} = $SIG{INT} = sub {
=pod
+=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+ my ($self, $master, $slot_name, $dbname) = @_;
+ my ($stdout, $stderr);
+
+ my $handle;
+
+ $handle = IPC::Run::start(['pg_recvlogical', '-d',
$self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name,
'--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+ # Once slot restart_lsn is created, the standby looks for
xl_running_xacts
+ # WAL record from the restart_lsn onwards. So firstly, wait until the
slot
+ # restart_lsn is evaluated.
+
+ $self->poll_query_until(
+ 'postgres', qq[
+ SELECT restart_lsn IS NOT NULL
+ FROM pg_catalog.pg_replication_slots WHERE slot_name =
'$slot_name'
+ ]) or die "timed out waiting for logical slot to calculate its
restart_lsn";
+
+ # Now arrange for the xl_running_xacts record for which pg_recvlogical
+ # is waiting.
+ $master->safe_psql('postgres', 'CHECKPOINT');
+
+ $handle->finish();
+
+ is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on
standby created')
+ or die "could not create slot" . $slot_name;
+}
+
+=pod
+
=back
=cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b0e398363f..d68ee9b663 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -38,6 +38,7 @@ tests += {
't/031_recovery_conflict.pl',
't/032_relfilenode_reuse.pl',
't/033_replay_tsp_drops.pl',
+ 't/034_standby_logical_decoding.pl',
],
},
}
diff --git a/src/test/recovery/t/034_standby_logical_decoding.pl
b/src/test/recovery/t/034_standby_logical_decoding.pl
new file mode 100644
index 0000000000..4258844c8f
--- /dev/null
+++ b/src/test/recovery/t/034_standby_logical_decoding.pl
@@ -0,0 +1,479 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use Test::More tests => 42;
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+ my ($node, $pat, $off) = @_;
+
+ $off = 0 unless defined $off;
+ my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+ return 0 if (length($log) <= $off);
+
+ $log = substr($log, $off);
+
+ return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+ my ($node, $slotname, $check_expr) = @_;
+
+ $node->poll_query_until(
+ 'postgres', qq[
+ SELECT $check_expr
+ FROM pg_catalog.pg_replication_slots
+ WHERE slot_name = '$slotname';
+ ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+ $node_standby->create_logical_slot_on_standby($node_primary,
'inactiveslot', 'testdb');
+ $node_standby->create_logical_slot_on_standby($node_primary,
'activeslot', 'testdb');
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot'
slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+ my $wait = shift;
+ my $slot_user_handle;
+
+ print "starting pg_recvlogical\n";
+ $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d',
$node_standby->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop',
'--start'], '>', \$stdout, '2>', \$stderr);
+
+ if ($wait)
+ # make sure activeslot is in use
+ {
+ $node_standby->poll_query_until('testdb',
+ "SELECT EXISTS (SELECT 1 FROM pg_replication_slots
WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+ ) or die "slot never became active";
+ }
+
+ return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+ my ($slot_user_handle, $check_stderr) = @_;
+ my $return;
+
+ # our client should've terminated in response to the walsender error
+ $slot_user_handle->finish;
+ $return = $?;
+ cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+ if ($return) {
+ like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+ }
+
+ return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+ my ($slot_user_handle) = @_;
+
+ is($node_standby->slot('inactiveslot')->{'slot_type'}, '',
'inactiveslot on standby dropped');
+ is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on
standby dropped');
+
+ check_pg_recvlogical_stderr($slot_user_handle, "conflict with
recovery");
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM
pg_create_physical_replication_slot('$primary_slotname');]);
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+ $node_primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+ qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+create_logical_slots();
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s,
s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+my $result = $node_standby->safe_psql('testdb',
+ qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+ 14, 'Decoding produced 14 rows');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM
generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+ qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+ "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL)
ORDER BY lsn DESC LIMIT 1;"
+);
+print "waiting to replay $endpos\n";
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM
generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+ 'testdb', 'activeslot', $endpos, 180,
+ 'include-xids' => '0',
+ 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+ 'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+ "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name =
'activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+ 'testdb', 'activeslot', $endpos, 180,
+ 'include-xids' => '0',
+ 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+ 'otherdb',
+ "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL,
NULL) ORDER BY lsn DESC LIMIT 1;"
+ ),
+ 3,
+ 'replaying logical slot from another database fails');
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('activeslot')]);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+create_logical_slots();
+
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on the standby.
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_standby->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on primary. Both should be NULL since hs_feedback is off
+wait_for_xmins($node_primary, $primary_slotname,
+ "xmin IS NULL AND catalog_xmin IS NULL");
+
+$handle = make_slot_active(1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', 'VACUUM FULL');
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+ $node_standby,
+ "invalidating slot \"inactiveslot\" because it conflicts with recovery"),
+ 'inactiveslot slot invalidation is logged with vacuum FULL');
+
+ok( find_in_log(
+ $node_standby,
+ "invalidating slot \"activeslot\" because it conflicts with recovery"),
+ 'activeslot slot invalidation is logged with vacuum FULL');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been
updated
+ok( $node_standby->poll_query_until(
+ 'postgres',
+ "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts
where datname = 'testdb'", 't'),
+ 'confl_active_logicalslot updated') or die "Timed out waiting
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on primary. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_primary, $primary_slotname,
+ "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('activeslot')]);
+
+create_logical_slots();
+
+# One way to produce recovery conflict is to create/drop a relation and launch
a vacuum
+# with hot_standby_feedback turned off on the standby.
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_standby->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on primary. Both should be NULL since hs_feedback is off
+wait_for_xmins($node_primary, $primary_slotname,
+ "xmin IS NULL AND catalog_xmin IS NULL");
+
+$handle = make_slot_active(1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM');
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+ $node_standby,
+ "invalidating slot \"inactiveslot\" because it conflicts with recovery",
$logstart),
+ 'inactiveslot slot invalidation is logged due to row removal');
+
+ok( find_in_log(
+ $node_standby,
+ "invalidating slot \"activeslot\" because it conflicts with recovery",
$logstart),
+ 'activeslot slot invalidation is logged due to row removal');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been
updated
+# we now expect 2 conflicts reported as the counter persist across restarts
+ok( $node_standby->poll_query_until(
+ 'postgres',
+ "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts
where datname = 'testdb'", 't'),
+ 'confl_active_logicalslot updated') or die "Timed out waiting
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on primary. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_primary, $primary_slotname,
+ "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 3: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('activeslot')]);
+
+create_logical_slots();
+
+$handle = make_slot_active(1);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+ $node_standby,
+ "invalidating slot \"inactiveslot\" because it conflicts with recovery",
$logstart),
+ 'inactiveslot slot invalidation is logged due to wal_level');
+
+ok( find_in_log(
+ $node_standby,
+ "invalidating slot \"activeslot\" because it conflicts with recovery",
$logstart),
+ 'activeslot slot invalidation is logged due to wal_level');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been
updated
+# we now expect 3 conflicts reported as the counter persist across restarts
+ok( $node_standby->poll_query_until(
+ 'postgres',
+ "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts
where datname = 'testdb'", 't'),
+ 'confl_active_logicalslot updated') or die "Timed out waiting
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+# We are not able to read from the slot as it requires wal_level at least
logical on master
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires
wal_level to be at least logical on master");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+$handle = make_slot_active(0);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication
slot \"activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('activeslot')]);
+create_logical_slots();
+$handle = make_slot_active(1);
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot',
'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+is($node_standby->safe_psql('postgres',
+ q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]),
'f',
+ 'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+ 'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT
pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y
text);]);
+
+# create the logical slots
+create_logical_slots();
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM
generate_series(1,4) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('flush'));
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM
generate_series(5,7) s;]
+);
+
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+ qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on
promoted standby');
--
2.34.1
From e482640d28a9460d24f722ccfaabf6171e24c9f8 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 08:36:32 +0000
Subject: [PATCH v33 3/6] Allow logical decoding on standby.
Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary. Conflicting slots
would be handled in next commits.
Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes
Mello
---
src/backend/access/transam/xlog.c | 11 ++++
src/backend/replication/logical/decode.c | 22 ++++++-
src/backend/replication/logical/logical.c | 37 +++++++-----
src/backend/replication/slot.c | 73 +++++++++++++++--------
src/backend/replication/walsender.c | 27 +++++----
src/include/access/xlog.h | 1 +
6 files changed, 118 insertions(+), 53 deletions(-)
4.5% src/backend/access/transam/
36.6% src/backend/replication/logical/
57.9% src/backend/replication/
diff --git a/src/backend/access/transam/xlog.c
b/src/backend/access/transam/xlog.c
index fca6ee4584..f9cc842a6a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4462,6 +4462,17 @@ LocalProcessControlFile(bool reset)
ReadControlFile();
}
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+ return ControlFile->wal_level;
+}
+
/*
* Initialization of shared memory for XLOG
*/
diff --git a/src/backend/replication/logical/decode.c
b/src/backend/replication/logical/decode.c
index 2cc0ac9eb0..c210721ab0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer
*buf)
* can restart from there.
*/
break;
+ case XLOG_PARAMETER_CHANGE:
+ {
+ xl_parameter_change *xlrec =
+ (xl_parameter_change *)
XLogRecGetData(buf->record);
+
+ /*
+ * If wal_level on primary is reduced to less than
logical, then we
+ * want to prevent existing logical slots from being
used.
+ * Existing logical slots on standby get invalidated
when this WAL
+ * record is replayed; and further, slot creation fails
when the
+ * wal level is not sufficient; but all these
operations are not
+ * synchronized, so a logical slot may creep in while
the wal_level
+ * is being reduced. Hence this extra check.
+ */
+ if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on
standby requires "
+ "wal_level to
be at least logical on master")));
+ break;
+ }
case XLOG_NOOP:
case XLOG_NEXTOID:
case XLOG_SWITCH:
case XLOG_BACKUP_END:
- case XLOG_PARAMETER_CHANGE:
case XLOG_RESTORE_POINT:
case XLOG_FPW_CHANGE:
case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c
b/src/backend/replication/logical/logical.c
index 625a7f4273..a9567f2d8c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires a database
connection")));
- /* ----
- * TODO: We got to change that someday soon...
- *
- * There's basically three things missing to allow this:
- * 1) We need to be able to correctly and quickly identify the timeline
a
- * LSN belongs to
- * 2) We need to force hot_standby_feedback to be enabled at all times
so
- * the primary cannot remove rows we need.
- * 3) support dropping replication slots referring to a database, in
- * dbase_redo. There can't be any active ones due to HS recovery
- * conflicts, so that should be relatively easy.
- * ----
- */
if (RecoveryInProgress())
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("logical decoding cannot be used while
in recovery")));
+ {
+ /*
+ * This check may have race conditions, but whenever
+ * XLOG_PARAMETER_CHANGE indicates that wal_level has changed,
we
+ * verify that there are no existing logical replication slots.
And to
+ * avoid races around creating a new slot,
+ * CheckLogicalDecodingRequirements() is called once before
creating
+ * the slot, and once when logical decoding is initially
starting up.
+ */
+ if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on standby
requires "
+ "wal_level to be at
least logical on master")));
+ }
}
/*
@@ -331,6 +330,12 @@ CreateInitDecodingContext(const char *plugin,
LogicalDecodingContext *ctx;
MemoryContext old_context;
+ /*
+ * On standby, this check is also required while creating the slot.
Check
+ * the comments in this function.
+ */
+ CheckLogicalDecodingRequirements();
+
/* shorter lines... */
slot = MyReplicationSlot;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6a4e2cd19b..f554dac6fd 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -51,6 +51,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "access/xlogrecovery.h"
/*
* Replication slot on-disk data structure.
@@ -1175,37 +1176,46 @@ ReplicationSlotReserveWal(void)
/*
* For logical slots log a standby snapshot and start logical
decoding
* at exactly that position. That allows the slot to start up
more
- * quickly.
+ * quickly. But on a standby we cannot do WAL writes, so just
use the
+ * replay pointer; effectively, an attempt to create a logical
slot on
+ * standby will cause it to wait for an xl_running_xact record
to be
+ * logged independently on the primary, so that a snapshot can
be built
+ * using the record.
*
- * That's not needed (or indeed helpful) for physical slots as
they'll
- * start replay at the last logged checkpoint anyway. Instead
return
- * the location of the last redo LSN. While that slightly
increases
- * the chance that we have to retry, it's where a base backup
has to
- * start replay at.
+ * None of this is needed (or indeed helpful) for physical
slots as
+ * they'll start replay at the last logged checkpoint anyway.
Instead
+ * return the location of the last redo LSN. While that slightly
+ * increases the chance that we have to retry, it's where a
base backup
+ * has to start replay at.
*/
- if (!RecoveryInProgress() && SlotIsLogical(slot))
+ if (SlotIsPhysical(slot))
+ restart_lsn = GetRedoRecPtr();
+ else if (RecoveryInProgress())
{
- XLogRecPtr flushptr;
-
- /* start at current insert position */
- restart_lsn = GetXLogInsertRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
-
- /* make sure we have enough information to start */
- flushptr = LogStandbySnapshot();
-
- /* and make sure it's fsynced to disk */
- XLogFlush(flushptr);
+ restart_lsn = GetXLogReplayRecPtr(NULL);
+ /*
+ * Replay pointer may point one past the end of the
record. If that
+ * is a XLOG page boundary, it will not be a valid LSN
for the
+ * start of a record, so bump it up past the page
header.
+ */
+ if (!XRecOffIsValid(restart_lsn))
+ {
+ if (restart_lsn % XLOG_BLCKSZ != 0)
+ elog(ERROR, "invalid replay pointer");
+
+ /* For the first page of a segment file, it's a
long header */
+ if (XLogSegmentOffset(restart_lsn,
wal_segment_size) == 0)
+ restart_lsn += SizeOfXLogLongPHD;
+ else
+ restart_lsn += SizeOfXLogShortPHD;
+ }
}
else
- {
- restart_lsn = GetRedoRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
- }
+ restart_lsn = GetXLogInsertRecPtr();
+
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
/* prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
@@ -1221,6 +1231,17 @@ ReplicationSlotReserveWal(void)
if (XLogGetLastRemovedSegno() < segno)
break;
}
+
+ if (!RecoveryInProgress() && SlotIsLogical(slot))
+ {
+ XLogRecPtr flushptr;
+
+ /* make sure we have enough information to start */
+ flushptr = LogStandbySnapshot();
+
+ /* and make sure it's fsynced to disk */
+ XLogFlush(flushptr);
+ }
}
/*
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index 64fbd52e34..9662e316c9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,14 +906,18 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr
targetPagePtr, int req
int count;
WALReadError errinfo;
XLogSegNo segno;
- TimeLineID currTLI = GetWALInsertionTimeLine();
+ TimeLineID currTLI;
/*
- * Since logical decoding is only permitted on a primary server, we know
- * that the current timeline ID can't be changing any more. If we did
this
- * on a standby, we'd have to worry about the values we compute here
- * becoming invalid due to a promotion or timeline change.
+ * Since logical decoding is also permitted on a standby server, we need
+ * to check if the server is in recovery to decide how to get the
current
+ * timeline ID (so that it also cover the promotion or timeline change
cases).
*/
+ if (!RecoveryInProgress())
+ currTLI = GetWALInsertionTimeLine();
+ else
+ GetXLogReplayRecPtr(&currTLI);
+
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
@@ -3074,10 +3078,12 @@ XLogSendLogical(void)
* If first time through in this session, initialize flushPtr.
Otherwise,
* we only need to update flushPtr if EndRecPtr is past it.
*/
- if (flushPtr == InvalidXLogRecPtr)
- flushPtr = GetFlushRecPtr(NULL);
- else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
- flushPtr = GetFlushRecPtr(NULL);
+ if (flushPtr == InvalidXLogRecPtr ||
+ logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+ {
+ flushPtr = (am_cascading_walsender ?
+ GetStandbyFlushRecPtr(NULL) :
GetFlushRecPtr(NULL));
+ }
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3168,7 +3174,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
- *tli = replayTLI;
+ if (tli)
+ *tli = replayTLI;
result = replayPtr;
if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1fbd48fbda..027e155e8e 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
extern void BootStrapXLOG(void);
extern void InitializeWalConsistencyChecking(void);
extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
extern void StartupXLOG(void);
extern void ShutdownXLOG(int code, Datum arg);
extern void CreateCheckPoint(int flags);
--
2.34.1
From 837de702ef25df5d9c0e185a8f1b8644a114b2bb Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 08:35:39 +0000
Subject: [PATCH v33 2/6] Handle logical slot conflicts on standby.
During WAL replay on standby, when slot conflict is identified,
invalidate such slots. Also do the same thing if wal_level on master
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery. Arrange for a new pg_stat_database_conflicts field:
confl_active_logicalslot.
Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes
Mello
---
doc/src/sgml/monitoring.sgml | 11 +
src/backend/access/gist/gistxlog.c | 2 +
src/backend/access/hash/hash_xlog.c | 1 +
src/backend/access/heap/heapam.c | 3 +
src/backend/access/nbtree/nbtxlog.c | 2 +
src/backend/access/spgist/spgxlog.c | 1 +
src/backend/access/transam/xlog.c | 13 ++
src/backend/catalog/system_views.sql | 3 +-
.../replication/logical/logicalfuncs.c | 7 +-
src/backend/replication/slot.c | 209 ++++++++++++++++++
src/backend/replication/walsender.c | 8 +
src/backend/storage/ipc/procarray.c | 4 +
src/backend/storage/ipc/procsignal.c | 3 +
src/backend/storage/ipc/standby.c | 13 +-
src/backend/tcop/postgres.c | 22 ++
src/backend/utils/activity/pgstat_database.c | 4 +
src/backend/utils/adt/pgstatfuncs.c | 3 +
src/include/catalog/pg_proc.dat | 5 +
src/include/pgstat.h | 1 +
src/include/replication/slot.h | 2 +
src/include/storage/procsignal.h | 1 +
src/include/storage/standby.h | 2 +
src/test/regress/expected/rules.out | 3 +-
23 files changed, 318 insertions(+), 5 deletions(-)
3.8% doc/src/sgml/
5.2% src/backend/access/transam/
3.8% src/backend/access/
3.9% src/backend/replication/logical/
58.2% src/backend/replication/
7.0% src/backend/storage/ipc/
7.9% src/backend/tcop/
3.3% src/backend/
5.7% src/include/
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 363b183e5f..27235418a6 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4317,6 +4317,17 @@ SELECT pid, wait_event_type, wait_event FROM
pg_stat_activity WHERE wait_event i
deadlocks
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of active logical slots in this database that have been
+ invalidated because they conflict with recovery (note that inactive ones
+ are also invalidated but do not increment this counter)
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/src/backend/access/gist/gistxlog.c
b/src/backend/access/gist/gistxlog.c
index 6e260f9aba..235776c5d3 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -196,6 +196,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+
xldata->logicalConflictPossible,
rlocator);
}
@@ -396,6 +397,7 @@ gistRedoPageReuse(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+
xlrec->logicalConflictPossible,
xlrec->locator);
}
diff --git a/src/backend/access/hash/hash_xlog.c
b/src/backend/access/hash/hash_xlog.c
index b452697a2f..f3dd5ae082 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1001,6 +1001,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+
xldata->logicalConflictPossible,
rlocator);
}
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 383fd76918..ba72b57ece 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8694,6 +8694,7 @@ heap_xlog_prune(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+
xlrec->logicalConflictPossible,
rlocator);
/*
@@ -8863,6 +8864,7 @@ heap_xlog_visible(XLogReaderState *record)
*/
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+
xlrec->flags & VISIBILITYMAP_ON_CATALOG_ACCESSIBLE_IN_LOGICAL_DECODING,
rlocator);
/*
@@ -9118,6 +9120,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+
xlrec->logicalConflictPossible,
rlocator);
}
diff --git a/src/backend/access/nbtree/nbtxlog.c
b/src/backend/access/nbtree/nbtxlog.c
index 3e311a98a6..ff4cd9f5e9 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+
xlrec->logicalConflictPossible,
rlocator);
}
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
if (InHotStandby)
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+
xlrec->logicalConflictPossible,
xlrec->locator);
}
diff --git a/src/backend/access/spgist/spgxlog.c
b/src/backend/access/spgist/spgxlog.c
index 44adc2098f..ac4ec394eb 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+
xldata->logicalConflictPossible,
locator);
}
diff --git a/src/backend/access/transam/xlog.c
b/src/backend/access/transam/xlog.c
index 91473b00d9..fca6ee4584 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7958,6 +7958,19 @@ xlog_redo(XLogReaderState *record)
/* Update our copy of the parameters in pg_control */
memcpy(&xlrec, XLogRecGetData(record),
sizeof(xl_parameter_change));
+ /*
+ * Invalidate logical slots if we are in hot standby and the
primary does not
+ * have a WAL level sufficient for logical decoding. No need to
search
+ * for potentially conflicting logically slots if standby is
running
+ * with wal_level lower than logical, because in that case, we
would
+ * have either disallowed creation of logical slots or
invalidated existing
+ * ones.
+ */
+ if (InRecovery && InHotStandby &&
+ xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+ wal_level >= WAL_LEVEL_LOGICAL)
+
InvalidateConflictingLogicalReplicationSlots(InvalidOid,InvalidTransactionId);
+
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->MaxConnections = xlrec.MaxConnections;
ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/catalog/system_views.sql
b/src/backend/catalog/system_views.sql
index 2d8104b090..0e0b8ef415 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1065,7 +1065,8 @@ CREATE VIEW pg_stat_database_conflicts AS
pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
- pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+ pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+ pg_stat_get_db_conflict_logicalslot(D.oid) AS
confl_active_logicalslot
FROM pg_database D;
CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/logical/logicalfuncs.c
b/src/backend/replication/logical/logicalfuncs.c
index 5c23178570..8432de219b 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -216,11 +216,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo,
bool confirm, bool bin
/*
* After the sanity checks in CreateDecodingContext, make sure
the
- * restart_lsn is valid. Avoid "cannot get changes" wording in
this
+ * restart_lsn is valid or both xmin and catalog_xmin are valid.
+ * Avoid "cannot get changes" wording in this
* errmsg because that'd be confusingly ambiguous about no
changes
* being available.
*/
- if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
+ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)
+ || (!TransactionIdIsValid(MyReplicationSlot->data.xmin)
+ &&
!TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin)))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("can no longer get changes from
replication slot \"%s\"",
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 899acfd912..6a4e2cd19b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1432,6 +1432,215 @@ restart:
return invalidated;
}
+/*
+ * Helper for InvalidateConflictingLogicalReplicationSlot -- acquires the
given slot
+ * and mark it invalid, if necessary and possible.
+ *
+ * Returns whether ReplicationSlotControlLock was released in the interim (and
+ * in that case we're not holding the lock at return, otherwise we are).
+ *
+ * This is inherently racy, because we release the LWLock
+ * for syscalls, so caller must restart if we return true.
+ */
+static bool
+InvalidatePossiblyConflictingLogicalReplicationSlot(ReplicationSlot *s,
TransactionId xid)
+{
+ int last_signaled_pid = 0;
+ bool released_lock = false;
+
+ for (;;)
+ {
+ TransactionId slot_xmin;
+ TransactionId slot_catalog_xmin;
+ NameData slotname;
+ int active_pid = 0;
+
+ Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
LW_SHARED));
+
+ if (!s->in_use)
+ {
+ if (released_lock)
+ LWLockRelease(ReplicationSlotControlLock);
+ break;
+ }
+
+ /*
+ * Check if the slot needs to be invalidated. If it needs to be
+ * invalidated, and is not currently acquired, acquire it and
mark it
+ * as having been invalidated. We do this with the spinlock
held to
+ * avoid race conditions -- for example the xmin(s) could move
forward
+ * , or the slot could be dropped.
+ */
+ SpinLockAcquire(&s->mutex);
+
+ slot_xmin = s->data.xmin;
+ slot_catalog_xmin = s->data.catalog_xmin;
+
+ /*
+ * If the slot is already invalid or is not conflicting, we
don't need to
+ * do anything.
+ */
+
+ /* slot has been invalidated */
+ if ((!TransactionIdIsValid(slot_xmin) &&
!TransactionIdIsValid(slot_catalog_xmin))
+ ||
+ /*
+ * we are not forcing for invalidation because the xid is valid
+ * and this is a non conflicting slot
+ */
+ (TransactionIdIsValid(xid) && !(
+ (TransactionIdIsValid(slot_xmin) &&
TransactionIdPrecedesOrEquals(slot_xmin, xid))
+ ||
+ (TransactionIdIsValid(slot_catalog_xmin) &&
TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+ ))
+ )
+ {
+ SpinLockRelease(&s->mutex);
+ if (released_lock)
+ LWLockRelease(ReplicationSlotControlLock);
+ break;
+ }
+
+ slotname = s->data.name;
+ active_pid = s->active_pid;
+
+ /*
+ * If the slot can be acquired, do so and mark it invalidated
+ * immediately. Otherwise we'll signal the owning process,
below, and
+ * retry.
+ */
+ if (active_pid == 0)
+ {
+ MyReplicationSlot = s;
+ s->active_pid = MyProcPid;
+ s->data.xmin = InvalidTransactionId;
+ s->data.catalog_xmin = InvalidTransactionId;
+ }
+
+ SpinLockRelease(&s->mutex);
+
+ if (active_pid != 0)
+ {
+ /*
+ * Prepare the sleep on the slot's condition variable
before
+ * releasing the lock, to close a possible race
condition if the
+ * slot is released before the sleep below.
+ */
+
+ ConditionVariablePrepareToSleep(&s->active_cv);
+
+ LWLockRelease(ReplicationSlotControlLock);
+ released_lock = true;
+
+ /*
+ * Signal to terminate the process that owns the slot,
if we
+ * haven't already signalled it. (Avoidance of repeated
+ * signalling is the only reason for there to be a loop
in this
+ * routine; otherwise we could rely on caller's restart
loop.)
+ *
+ * There is the race condition that other process may
own the slot
+ * after its current owner process is terminated and
before this
+ * process owns it. To handle that, we signal only if
the PID of
+ * the owning process has changed from the previous
time. (This
+ * logic assumes that the same PID is not reused very
quickly.)
+ */
+ if (last_signaled_pid != active_pid)
+ {
+ ereport(LOG,
+ (errmsg("terminating process %d
because replication slot \"%s\" conflicts with recovery",
+ active_pid,
NameStr(slotname))));
+
+ (void) SendProcSignal(active_pid,
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
+ last_signaled_pid = active_pid;
+ }
+
+ /* Wait until the slot is released. */
+ ConditionVariableSleep(&s->active_cv,
+
WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+ /*
+ * Re-acquire lock and start over; we expect to
invalidate the
+ * slot next time (unless another process acquires the
slot in the
+ * meantime).
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ continue;
+ }
+ else
+ {
+ /*
+ * We hold the slot now and have already invalidated
it; flush it
+ * to ensure that state persists.
+ *
+ * Don't want to hold ReplicationSlotControlLock across
file
+ * system operations, so release it now but be sure to
tell caller
+ * to restart from scratch.
+ */
+ LWLockRelease(ReplicationSlotControlLock);
+ released_lock = true;
+
+ /* Make sure the invalidated state persists across
server restart */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ ReplicationSlotRelease();
+ pgstat_drop_replslot(s);
+
+ ereport(LOG,
+ (errmsg("invalidating slot \"%s\"
because it conflicts with recovery", NameStr(slotname))));
+
+ /* done with this slot for now */
+ break;
+ }
+ }
+
+ Assert(!released_lock ==
LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+ return released_lock;
+}
+
+/*
+ * Resolve recovery conflicts with logical slots.
+ *
+ * When xid is valid, it means that we are about to remove rows older than xid.
+ * Therefore we need to invalidate slots that depend on seeing those rows.
+ * When xid is invalid, invalidate all logical slots. This is required when the
+ * master wal_level is set back to replica, so existing logical slots need to
+ * be invalidated.
+ */
+void
+InvalidateConflictingLogicalReplicationSlots(Oid dboid, TransactionId xid)
+{
+
+ Assert(max_replication_slots >= 0);
+
+ if (max_replication_slots == 0)
+ return;
+restart:
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (!s->in_use)
+ continue;
+
+ /* We are only dealing with *logical* slot conflicts. */
+ if (!SlotIsLogical(s))
+ continue;
+
+ /* not our database and we don't want all the database, skip */
+ if (s->data.database != dboid && TransactionIdIsValid(xid))
+ continue;
+
+ if (InvalidatePossiblyConflictingLogicalReplicationSlot(s, xid))
+ {
+ /* if the lock was released, we need to restart from
scratch */
+ goto restart;
+ }
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+}
+
/*
* Flush all replication slots to disk.
*
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index c11bb3716f..64fbd52e34 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,6 +1253,14 @@ StartLogicalReplication(StartReplicationCmd *cmd)
ReplicationSlotAcquire(cmd->slotname, true);
+ if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)
+ && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin))
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot read from logical replication
slot \"%s\"",
+ cmd->slotname),
+ errdetail("This slot has been invalidated
because it was conflicting with recovery.")));
+
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/storage/ipc/procarray.c
b/src/backend/storage/ipc/procarray.c
index 0176f30270..d68b752c91 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -3477,6 +3477,10 @@ SignalVirtualTransaction(VirtualTransactionId vxid,
ProcSignalReason sigmode,
GET_VXID_FROM_PGPROC(procvxid, *proc);
+ /*
+ * Note: vxid.localTransactionId can be invalid, which means the
+ * request is to signal the pid that is not running a
transaction.
+ */
if (procvxid.backendId == vxid.backendId &&
procvxid.localTransactionId == vxid.localTransactionId)
{
diff --git a/src/backend/storage/ipc/procsignal.c
b/src/backend/storage/ipc/procsignal.c
index 7767657f27..1b3bf943c1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -669,6 +669,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
+ if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
diff --git a/src/backend/storage/ipc/standby.c
b/src/backend/storage/ipc/standby.c
index f43229dfda..1afd119e01 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -35,6 +35,7 @@
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+#include "replication/slot.h"
/* User-settable GUC parameters */
int vacuum_defer_cleanup_age;
@@ -475,6 +476,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId
*waitlist,
*/
void
ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+ bool
logicalConflictPossible,
RelFileLocator locator)
{
VirtualTransactionId *backends;
@@ -499,6 +501,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId
snapshotConflictHorizon,
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
true);
+
+ if (logicalConflictPossible)
+ InvalidateConflictingLogicalReplicationSlots(locator.dbOid,
snapshotConflictHorizon);
}
/*
@@ -507,6 +512,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId
snapshotConflictHorizon,
*/
void
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId
snapshotConflictHorizon,
+
bool logicalConflictPossible,
RelFileLocator locator)
{
/*
@@ -525,7 +531,9 @@
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
TransactionId truncated;
truncated = XidFromFullTransactionId(snapshotConflictHorizon);
- ResolveRecoveryConflictWithSnapshot(truncated, locator);
+ ResolveRecoveryConflictWithSnapshot(truncated,
+
logicalConflictPossible,
+
locator);
}
}
@@ -1486,6 +1494,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
reasonDesc = _("recovery conflict on snapshot");
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ reasonDesc = _("recovery conflict on replication slot");
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
reasonDesc = _("recovery conflict on buffer deadlock");
break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 01d264b5ab..05da83bf5b 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2482,6 +2482,9 @@ errdetail_recovery_conflict(void)
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
errdetail("User query might have needed to see row
versions that must be removed.");
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ errdetail("User was using the logical slot that must be
dropped.");
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
errdetail("User transaction caused buffer deadlock with
recovery.");
break;
@@ -3051,6 +3054,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
case PROCSIG_RECOVERY_CONFLICT_LOCK:
case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ /*
+ * For conflicts that require a logical slot to
be invalidated, the
+ * requirement is for the signal receiver to
release the slot,
+ * so that it could be invalidated by the
signal sender. So for
+ * normal backends, the transaction should be
aborted, just
+ * like for other recovery conflicts. But if
it's walsender on
+ * standby, then it has to be killed so as to
release an
+ * acquired logical slot.
+ */
+ if (am_cascading_walsender &&
+ reason ==
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+ MyReplicationSlot &&
SlotIsLogical(MyReplicationSlot))
+ {
+ RecoveryConflictPending = true;
+ QueryCancelPending = true;
+ InterruptPending = true;
+ break;
+ }
/*
* If we aren't in a transaction any longer
then ignore.
diff --git a/src/backend/utils/activity/pgstat_database.c
b/src/backend/utils/activity/pgstat_database.c
index 290086fc22..7a8909d8b9 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
dbentry->conflict_bufferpin++;
break;
+ case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+ dbentry->conflict_logicalslot++;
+ break;
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
dbentry->conflict_startup_deadlock++;
break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool
nowait)
PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
PGSTAT_ACCUM_DBCOUNT(conflict_lock);
PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+ PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
diff --git a/src/backend/utils/adt/pgstatfuncs.c
b/src/backend/utils/adt/pgstatfuncs.c
index 46f98fd67f..41eb6256ea 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1055,6 +1055,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
/* pg_stat_get_db_xact_rollback */
PG_STAT_GET_DBENTRY_INT64(xact_rollback)
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
Datum
pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1088,6 +1090,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
result = (int64) (dbentry->conflict_tablespace +
dbentry->conflict_lock +
dbentry->conflict_snapshot +
+ dbentry->conflict_logicalslot
+
dbentry->conflict_bufferpin +
dbentry->conflict_startup_deadlock);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 98d90d9338..21dd65a483 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5546,6 +5546,11 @@
proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+ descr => 'statistics: recovery conflicts in database caused by logical
replication slot',
+ proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+ prosrc => 'pg_stat_get_db_conflict_logicalslot' },
{ oid => '3068',
descr => 'statistics: recovery conflicts in database caused by shared buffer
pin',
proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a3df8d27c3..7ffce84d07 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -291,6 +291,7 @@ typedef struct PgStat_StatDBEntry
PgStat_Counter conflict_tablespace;
PgStat_Counter conflict_lock;
PgStat_Counter conflict_snapshot;
+ PgStat_Counter conflict_logicalslot;
PgStat_Counter conflict_bufferpin;
PgStat_Counter conflict_startup_deadlock;
PgStat_Counter temp_files;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 65f2c74239..0ed1d8af28 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -216,6 +216,7 @@ extern XLogRecPtr
ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern void InvalidateConflictingLogicalReplicationSlots(Oid dboid,
TransactionId xid);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool
need_lock);
extern int ReplicationSlotIndex(ReplicationSlot *slot);
extern bool ReplicationSlotName(int index, Name name);
@@ -227,5 +228,6 @@ extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void);
extern void CheckSlotPermissions(void);
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId
xid, char *reason);
#endif /* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index ee636900f3..56096bd3e2 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -41,6 +41,7 @@ typedef enum
PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
PROCSIG_RECOVERY_CONFLICT_LOCK,
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index e46c934c56..2f62fe5fc8 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
extern void ShutdownRecoveryTransactionEnvironment(void);
extern void ResolveRecoveryConflictWithSnapshot(TransactionId
snapshotConflictHorizon,
+
bool logicalConflictPossible,
RelFileLocator locator);
extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId
snapshotConflictHorizon,
+
bool logicalConflictPossible,
RelFileLocator locator);
extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
diff --git a/src/test/regress/expected/rules.out
b/src/test/regress/expected/rules.out
index fb9f936d43..1cc62c447d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1868,7 +1868,8 @@ pg_stat_database_conflicts| SELECT d.oid AS datid,
pg_stat_get_db_conflict_lock(d.oid) AS confl_lock,
pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot,
pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin,
- pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock
+ pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock,
+ pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_active_logicalslot
FROM pg_database d;
pg_stat_gssapi| SELECT s.pid,
s.gss_auth AS gss_authenticated,
--
2.34.1
From d0dfaf6e93f4771d849073abcab5e65aac0be921 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 08:32:04 +0000
Subject: [PATCH v33 1/6] Add info in WAL records in preparation for logical
slot conflict handling.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Overall design:
1. We want to enable logical decoding on standbys, but replay of WAL
from the primary might remove data that is needed by logical decoding,
causing replication conflicts much as hot standby does.
2. Our chosen strategy for dealing with this type of replication slot
is to invalidate logical slots for which needed data has been removed.
3. To do this we need the latestRemovedXid for each change, just as we
do for physical replication conflicts, but we also need to know
whether any particular change was to data that logical replication
might access.
4. We can't rely on the standby's relcache entries for this purpose in
any way, because the WAL record that causes the problem might be
replayed before the standby even reaches consistency.
5. Therefore every WAL record that potentially removes data from the
index or heap must carry a flag indicating whether or not it is one
that might be accessed during logical decoding.
Why do we need this for logical decoding on standby?
First, let's forget about logical decoding on standby and recall that
on a primary database, any catalog rows that may be needed by a logical
decoding replication slot are not removed.
This is done thanks to the catalog_xmin associated with the logical
replication slot.
But, with logical decoding on standby, in the following cases:
- hot_standby_feedback is off
- hot_standby_feedback is on but there is no a physical slot between
the primary and the standby. Then, hot_standby_feedback will work,
but only while the connection is alive (for example a node restart
would break it)
Then, the primary may delete system catalog rows that could be needed
by the logical decoding on the standby (as it does not know about the
catalog_xmin on the standby).
So, it’s mandatory to identify those rows and invalidate the slots
that may need them if any. Identifying those rows is the purpose of
this commit.
Implementation:
When a WAL replay on standby indicates that a catalog table tuple is
to be deleted by an xid that is greater than a logical slot's
catalog_xmin, then that means the slot's catalog_xmin conflicts with
the xid, and we need to handle the conflict. While subsequent commits
will do the actual conflict handling, this commit adds a new field
logicalConflictPossible in such WAL records (and a new bit set in the
xl_heap_visible flags field), that is true for catalog tables, so as to
arrange for conflict handling.
Author: Andres Freund (in an older version), Amit Khandekar, Bertrand
Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de
Royes Mello
---
contrib/test_decoding/expected/ddl.out | 65 +++++++++++++++++++++++++
contrib/test_decoding/sql/ddl.sql | 23 +++++++++
doc/src/sgml/catalogs.sgml | 11 +++++
src/backend/access/common/reloptions.c | 2 +-
src/backend/access/gist/gistxlog.c | 2 +
src/backend/access/hash/hashinsert.c | 2 +
src/backend/access/heap/heapam.c | 6 ++-
src/backend/access/heap/pruneheap.c | 2 +
src/backend/access/heap/visibilitymap.c | 3 +-
src/backend/access/nbtree/nbtpage.c | 4 ++
src/backend/access/spgist/spgvacuum.c | 2 +
src/backend/catalog/index.c | 10 ++--
src/backend/commands/tablecmds.c | 55 ++++++++++++++++++++-
src/include/access/gistxlog.h | 6 ++-
src/include/access/hash_xlog.h | 3 +-
src/include/access/heapam_xlog.h | 8 +--
src/include/access/nbtxlog.h | 6 ++-
src/include/access/spgxlog.h | 1 +
src/include/access/visibilitymapdefs.h | 9 ++--
src/include/catalog/pg_index.h | 2 +
src/include/utils/rel.h | 14 +++++-
21 files changed, 215 insertions(+), 21 deletions(-)
27.3% contrib/test_decoding/expected/
11.7% contrib/test_decoding/sql/
4.5% doc/src/sgml/
6.0% src/backend/access/heap/
4.6% src/backend/access/
3.0% src/backend/catalog/
15.9% src/backend/commands/
21.0% src/include/access/
4.6% src/include/utils/
diff --git a/contrib/test_decoding/expected/ddl.out
b/contrib/test_decoding/expected/ddl.out
index 9a28b5ddc5..48fb44c575 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -483,6 +483,7 @@ CREATE TABLE replication_metadata (
)
WITH (user_catalog_table = true)
;
+CREATE INDEX replication_metadata_idx1 on replication_metadata(relation);
\d+ replication_metadata
Table
"public.replication_metadata"
Column | Type | Collation | Nullable | Default
| Storage | Stats target | Description
@@ -492,11 +493,19 @@ WITH (user_catalog_table = true)
options | text[] | | |
| extended | |
Indexes:
"replication_metadata_pkey" PRIMARY KEY, btree (id)
+ "replication_metadata_idx1" btree (relation)
Options: user_catalog_table=true
+SELECT bool_and(indisusercatalog) from pg_index where indrelid =
'replication_metadata'::regclass;
+ bool_and
+----------
+ t
+(1 row)
+
INSERT INTO replication_metadata(relation, options)
VALUES ('foo', ARRAY['a', 'b']);
ALTER TABLE replication_metadata RESET (user_catalog_table);
+CREATE INDEX replication_metadata_idx2 on replication_metadata(relation);
\d+ replication_metadata
Table
"public.replication_metadata"
Column | Type | Collation | Nullable | Default
| Storage | Stats target | Description
@@ -506,10 +515,19 @@ ALTER TABLE replication_metadata RESET
(user_catalog_table);
options | text[] | | |
| extended | |
Indexes:
"replication_metadata_pkey" PRIMARY KEY, btree (id)
+ "replication_metadata_idx1" btree (relation)
+ "replication_metadata_idx2" btree (relation)
+
+SELECT bool_or(indisusercatalog) from pg_index where indrelid =
'replication_metadata'::regclass;
+ bool_or
+---------
+ f
+(1 row)
INSERT INTO replication_metadata(relation, options)
VALUES ('bar', ARRAY['a', 'b']);
ALTER TABLE replication_metadata SET (user_catalog_table = true);
+CREATE INDEX replication_metadata_idx3 on replication_metadata(relation);
\d+ replication_metadata
Table
"public.replication_metadata"
Column | Type | Collation | Nullable | Default
| Storage | Stats target | Description
@@ -519,15 +537,52 @@ ALTER TABLE replication_metadata SET (user_catalog_table
= true);
options | text[] | | |
| extended | |
Indexes:
"replication_metadata_pkey" PRIMARY KEY, btree (id)
+ "replication_metadata_idx1" btree (relation)
+ "replication_metadata_idx2" btree (relation)
+ "replication_metadata_idx3" btree (relation)
Options: user_catalog_table=true
+SELECT bool_and(indisusercatalog) from pg_index where indrelid =
'replication_metadata'::regclass;
+ bool_and
+----------
+ t
+(1 row)
+
INSERT INTO replication_metadata(relation, options)
VALUES ('blub', NULL);
+-- Also checking that indisusercatalog is set correctly when a table is
created with user_catalog_table = false
+CREATE TABLE replication_metadata_false (
+ id serial primary key,
+ relation name NOT NULL,
+ options text[]
+)
+WITH (user_catalog_table = false)
+;
+CREATE INDEX replication_metadata_false_idx1 on
replication_metadata_false(relation);
+\d+ replication_metadata_false
+ Table
"public.replication_metadata_false"
+ Column | Type | Collation | Nullable | Default
| Storage | Stats target | Description
+----------+---------+-----------+----------+--------------------------------------------------------+----------+--------------+-------------
+ id | integer | | not null |
nextval('replication_metadata_false_id_seq'::regclass) | plain |
|
+ relation | name | | not null |
| plain | |
+ options | text[] | | |
| extended | |
+Indexes:
+ "replication_metadata_false_pkey" PRIMARY KEY, btree (id)
+ "replication_metadata_false_idx1" btree (relation)
+Options: user_catalog_table=false
+
+SELECT bool_or(indisusercatalog) from pg_index where indrelid =
'replication_metadata_false'::regclass;
+ bool_or
+---------
+ f
+(1 row)
+
-- make sure rewrites don't work
ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int;
ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text;
ERROR: cannot rewrite table "replication_metadata" used as a catalog table
ALTER TABLE replication_metadata SET (user_catalog_table = false);
+CREATE INDEX replication_metadata_idx4 on replication_metadata(relation);
\d+ replication_metadata
Table
"public.replication_metadata"
Column | Type | Collation | Nullable | Default
| Storage | Stats target | Description
@@ -538,8 +593,18 @@ ALTER TABLE replication_metadata SET (user_catalog_table =
false);
rewritemeornot | integer | | |
| plain | |
Indexes:
"replication_metadata_pkey" PRIMARY KEY, btree (id)
+ "replication_metadata_idx1" btree (relation)
+ "replication_metadata_idx2" btree (relation)
+ "replication_metadata_idx3" btree (relation)
+ "replication_metadata_idx4" btree (relation)
Options: user_catalog_table=false
+SELECT bool_or(indisusercatalog) from pg_index where indrelid =
'replication_metadata'::regclass;
+ bool_or
+---------
+ f
+(1 row)
+
INSERT INTO replication_metadata(relation, options)
VALUES ('zaphod', NULL);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
'include-xids', '0', 'skip-empty-xacts', '1');
diff --git a/contrib/test_decoding/sql/ddl.sql
b/contrib/test_decoding/sql/ddl.sql
index 4f76bed72c..51baac5c4e 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -276,29 +276,52 @@ CREATE TABLE replication_metadata (
)
WITH (user_catalog_table = true)
;
+
+CREATE INDEX replication_metadata_idx1 on replication_metadata(relation);
+
\d+ replication_metadata
+SELECT bool_and(indisusercatalog) from pg_index where indrelid =
'replication_metadata'::regclass;
INSERT INTO replication_metadata(relation, options)
VALUES ('foo', ARRAY['a', 'b']);
ALTER TABLE replication_metadata RESET (user_catalog_table);
+CREATE INDEX replication_metadata_idx2 on replication_metadata(relation);
\d+ replication_metadata
+SELECT bool_or(indisusercatalog) from pg_index where indrelid =
'replication_metadata'::regclass;
INSERT INTO replication_metadata(relation, options)
VALUES ('bar', ARRAY['a', 'b']);
ALTER TABLE replication_metadata SET (user_catalog_table = true);
+CREATE INDEX replication_metadata_idx3 on replication_metadata(relation);
\d+ replication_metadata
+SELECT bool_and(indisusercatalog) from pg_index where indrelid =
'replication_metadata'::regclass;
INSERT INTO replication_metadata(relation, options)
VALUES ('blub', NULL);
+-- Also checking that indisusercatalog is set correctly when a table is
created with user_catalog_table = false
+CREATE TABLE replication_metadata_false (
+ id serial primary key,
+ relation name NOT NULL,
+ options text[]
+)
+WITH (user_catalog_table = false)
+;
+
+CREATE INDEX replication_metadata_false_idx1 on
replication_metadata_false(relation);
+\d+ replication_metadata_false
+SELECT bool_or(indisusercatalog) from pg_index where indrelid =
'replication_metadata_false'::regclass;
+
-- make sure rewrites don't work
ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int;
ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text;
ALTER TABLE replication_metadata SET (user_catalog_table = false);
+CREATE INDEX replication_metadata_idx4 on replication_metadata(relation);
\d+ replication_metadata
+SELECT bool_or(indisusercatalog) from pg_index where indrelid =
'replication_metadata'::regclass;
INSERT INTO replication_metadata(relation, options)
VALUES ('zaphod', NULL);
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 9316b811ac..459539b761 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -4447,6 +4447,17 @@ SCRAM-SHA-256$<replaceable><iteration
count></replaceable>:<replaceable>&l
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>indisusercatalog</structfield> <type>bool</type>
+ </para>
+ <para>
+ If true, the index is linked to a table that is declared as an
additional
+ catalog table for purposes of logical replication (means has <link
linkend="sql-createtable"><literal>user_catalog_table</literal></link>)
+ set to true.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>indisreplident</structfield> <type>bool</type>
diff --git a/src/backend/access/common/reloptions.c
b/src/backend/access/common/reloptions.c
index 75b7344891..4b41f5e68d 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -120,7 +120,7 @@ static relopt_bool boolRelOpts[] =
RELOPT_KIND_HEAP,
AccessExclusiveLock
},
- false
+ HEAP_DEFAULT_USER_CATALOG_TABLE
},
{
{
diff --git a/src/backend/access/gist/gistxlog.c
b/src/backend/access/gist/gistxlog.c
index cb5affa3d2..6e260f9aba 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -608,6 +608,8 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno,
FullTransactionId deleteXid)
*/
/* XLOG stuff */
+ xlrec_reuse.logicalConflictPossible =
+ RelationIsAccessibleInLogicalDecoding(rel);
xlrec_reuse.locator = rel->rd_locator;
xlrec_reuse.block = blkno;
xlrec_reuse.snapshotConflictHorizon = deleteXid;
diff --git a/src/backend/access/hash/hashinsert.c
b/src/backend/access/hash/hashinsert.c
index 9a921e341e..18eb052280 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -432,6 +432,8 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer
metabuf, Buffer buf)
xl_hash_vacuum_one_page xlrec;
XLogRecPtr recptr;
+ xlrec.logicalConflictPossible =
+ RelationIsAccessibleInLogicalDecoding(hrel);
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
xlrec.ntuples = ndeletable;
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 42756a9e6d..383fd76918 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -6827,6 +6827,8 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
snapshotConflictHorizon = FreezeLimit;
TransactionIdRetreat(snapshotConflictHorizon);
+ xlrec.logicalConflictPossible =
+ RelationIsAccessibleInLogicalDecoding(rel);
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
xlrec.nplans = nplans;
@@ -8244,7 +8246,7 @@ bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate)
* update the heap page's LSN.
*/
XLogRecPtr
-log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
TransactionId snapshotConflictHorizon, uint8
vmflags)
{
xl_heap_visible xlrec;
@@ -8256,6 +8258,8 @@ log_heap_visible(RelFileLocator rlocator, Buffer
heap_buffer, Buffer vm_buffer,
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
xlrec.flags = vmflags;
+ if (RelationIsAccessibleInLogicalDecoding(rel))
+ xlrec.flags |=
VISIBILITYMAP_ON_CATALOG_ACCESSIBLE_IN_LOGICAL_DECODING;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
diff --git a/src/backend/access/heap/pruneheap.c
b/src/backend/access/heap/pruneheap.c
index 91c5f5e9ef..b2fc4d70ff 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -418,6 +418,8 @@ heap_page_prune(Relation relation, Buffer buffer,
xl_heap_prune xlrec;
XLogRecPtr recptr;
+ xlrec.logicalConflictPossible =
+ RelationIsAccessibleInLogicalDecoding(relation);
xlrec.snapshotConflictHorizon =
prstate.snapshotConflictHorizon;
xlrec.nredirected = prstate.nredirected;
xlrec.ndead = prstate.ndead;
diff --git a/src/backend/access/heap/visibilitymap.c
b/src/backend/access/heap/visibilitymap.c
index 4ed70275e2..0bd73f4d9f 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -283,8 +283,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer
heapBuf,
if (XLogRecPtrIsInvalid(recptr))
{
Assert(!InRecovery);
- recptr = log_heap_visible(rel->rd_locator,
heapBuf, vmBuf,
-
cutoff_xid, flags);
+ recptr = log_heap_visible(rel, heapBuf, vmBuf,
cutoff_xid, flags);
/*
* If data checksums are enabled (or
wal_log_hints=on), we
diff --git a/src/backend/access/nbtree/nbtpage.c
b/src/backend/access/nbtree/nbtpage.c
index 65aa44893c..cbefa6cd88 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -836,6 +836,8 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno,
FullTransactionId safexid)
*/
/* XLOG stuff */
+ xlrec_reuse.logicalConflictPossible =
+ RelationIsAccessibleInLogicalDecoding(rel);
xlrec_reuse.locator = rel->rd_locator;
xlrec_reuse.block = blkno;
xlrec_reuse.snapshotConflictHorizon = safexid;
@@ -1358,6 +1360,8 @@ _bt_delitems_delete(Relation rel, Buffer buf,
XLogRecPtr recptr;
xl_btree_delete xlrec_delete;
+ xlrec_delete.logicalConflictPossible =
+ RelationIsAccessibleInLogicalDecoding(rel);
xlrec_delete.snapshotConflictHorizon = snapshotConflictHorizon;
xlrec_delete.ndeleted = ndeletable;
xlrec_delete.nupdated = nupdatable;
diff --git a/src/backend/access/spgist/spgvacuum.c
b/src/backend/access/spgist/spgvacuum.c
index ad90b213b9..12747aee09 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -503,6 +503,8 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
spgxlogVacuumRedirect xlrec;
GlobalVisState *vistest;
+ xlrec.logicalConflictPossible =
+ RelationIsAccessibleInLogicalDecoding(index);
xlrec.nToPlaceholder = 0;
xlrec.snapshotConflictHorizon = InvalidTransactionId;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 61f1d3926a..f7540f4101 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -123,7 +123,8 @@ static void UpdateIndexRelation(Oid indexoid, Oid heapoid,
bool
isexclusion,
bool immediate,
bool isvalid,
- bool isready);
+ bool isready,
+ bool
is_user_catalog);
static void index_update_stats(Relation rel,
bool hasindex,
double reltuples);
@@ -545,7 +546,8 @@ UpdateIndexRelation(Oid indexoid,
bool isexclusion,
bool immediate,
bool isvalid,
- bool isready)
+ bool isready,
+ bool is_user_catalog)
{
int2vector *indkey;
oidvector *indcollation;
@@ -622,6 +624,7 @@ UpdateIndexRelation(Oid indexoid,
values[Anum_pg_index_indcheckxmin - 1] = BoolGetDatum(false);
values[Anum_pg_index_indisready - 1] = BoolGetDatum(isready);
values[Anum_pg_index_indislive - 1] = BoolGetDatum(true);
+ values[Anum_pg_index_indisusercatalog - 1] =
BoolGetDatum(is_user_catalog);
values[Anum_pg_index_indisreplident - 1] = BoolGetDatum(false);
values[Anum_pg_index_indkey - 1] = PointerGetDatum(indkey);
values[Anum_pg_index_indcollation - 1] = PointerGetDatum(indcollation);
@@ -1020,7 +1023,8 @@ index_create(Relation heapRelation,
isprimary, is_exclusion,
(constr_flags &
INDEX_CONSTR_CREATE_DEFERRABLE) == 0,
!concurrent && !invalid,
- !concurrent);
+ !concurrent,
+
RelationIsUsedAsCatalogTable(heapRelation));
/*
* Register relcache invalidation on the indexes' heap relation, to
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 56dc995713..fd8200e670 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -103,6 +103,7 @@
#include "utils/syscache.h"
#include "utils/timestamp.h"
#include "utils/typcache.h"
+#include "utils/rel.h"
/*
* ON COMMIT action list
@@ -14148,6 +14149,10 @@ ATExecSetRelOptions(Relation rel, List *defList,
AlterTableType operation,
Datum repl_val[Natts_pg_class];
bool repl_null[Natts_pg_class];
bool repl_repl[Natts_pg_class];
+ ListCell *cell;
+ List *rel_options;
+ bool catalog_table_val = HEAP_DEFAULT_USER_CATALOG_TABLE;
+ bool catalog_table = false;
static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
if (defList == NIL && operation != AT_ReplaceRelOptions)
@@ -14214,7 +14219,6 @@ ATExecSetRelOptions(Relation rel, List *defList,
AlterTableType operation,
{
Query *view_query = get_view_query(rel);
List *view_options = untransformRelOptions(newOptions);
- ListCell *cell;
bool check_option = false;
foreach(cell, view_options)
@@ -14242,6 +14246,20 @@ ATExecSetRelOptions(Relation rel, List *defList,
AlterTableType operation,
}
}
+ /* If user_catalog_table is part of the new options, record its new
value */
+ rel_options = untransformRelOptions(newOptions);
+
+ foreach(cell, rel_options)
+ {
+ DefElem *defel = (DefElem *) lfirst(cell);
+
+ if (strcmp(defel->defname, "user_catalog_table") == 0)
+ {
+ catalog_table = true;
+ catalog_table_val = defGetBoolean(defel);
+ }
+ }
+
/*
* All we need do here is update the pg_class row; the new options will
be
* propagated into relcaches during post-commit cache inval.
@@ -14268,6 +14286,41 @@ ATExecSetRelOptions(Relation rel, List *defList,
AlterTableType operation,
ReleaseSysCache(tuple);
+ /* Update the indexes if there is a need to */
+ if (catalog_table || operation == AT_ResetRelOptions)
+ {
+ Relation pg_index;
+ HeapTuple pg_index_tuple;
+ Form_pg_index pg_index_form;
+ ListCell *index;
+
+ pg_index = table_open(IndexRelationId, RowExclusiveLock);
+
+ foreach(index, RelationGetIndexList(rel))
+ {
+ Oid thisIndexOid =
lfirst_oid(index);
+
+ pg_index_tuple = SearchSysCacheCopy1(INDEXRELID,
+
ObjectIdGetDatum(thisIndexOid));
+ if (!HeapTupleIsValid(pg_index_tuple))
+ elog(ERROR, "cache lookup failed for index %u",
thisIndexOid);
+ pg_index_form = (Form_pg_index)
GETSTRUCT(pg_index_tuple);
+
+ /* Modify the index only if user_catalog_table differ */
+ if (catalog_table_val !=
pg_index_form->indisusercatalog)
+ {
+ pg_index_form->indisusercatalog =
catalog_table_val;
+ CatalogTupleUpdate(pg_index,
&pg_index_tuple->t_self, pg_index_tuple);
+ InvokeObjectPostAlterHookArg(IndexRelationId,
thisIndexOid, 0,
+
InvalidOid, true);
+ }
+
+ heap_freetuple(pg_index_tuple);
+ }
+
+ table_close(pg_index, RowExclusiveLock);
+ }
+
/* repeat the whole exercise for the toast table, if there's one */
if (OidIsValid(rel->rd_rel->reltoastrelid))
{
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 33f1c7e31b..d954c0a9da 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -51,13 +51,14 @@ typedef struct gistxlogDelete
{
TransactionId snapshotConflictHorizon;
uint16 ntodelete; /* number of deleted offsets */
+ bool logicalConflictPossible;
/*
* In payload of blk 0 : todelete OffsetNumbers
*/
} gistxlogDelete;
-#define SizeOfGistxlogDelete (offsetof(gistxlogDelete, ntodelete) +
sizeof(uint16))
+#define SizeOfGistxlogDelete (offsetof(gistxlogDelete,
logicalConflictPossible) + sizeof(bool))
/*
* Backup Blk 0: If this operation completes a page split, by inserting a
@@ -100,9 +101,10 @@ typedef struct gistxlogPageReuse
RelFileLocator locator;
BlockNumber block;
FullTransactionId snapshotConflictHorizon;
+ bool logicalConflictPossible;
} gistxlogPageReuse;
-#define SizeOfGistxlogPageReuse (offsetof(gistxlogPageReuse,
snapshotConflictHorizon) + sizeof(FullTransactionId))
+#define SizeOfGistxlogPageReuse (offsetof(gistxlogPageReuse,
logicalConflictPossible) + sizeof(bool))
extern void gist_redo(XLogReaderState *record);
extern void gist_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 6dafb4a598..c17d997b99 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -252,12 +252,13 @@ typedef struct xl_hash_vacuum_one_page
{
TransactionId snapshotConflictHorizon;
int ntuples;
+ bool logicalConflictPossible;
/* TARGET OFFSET NUMBERS FOLLOW AT THE END */
} xl_hash_vacuum_one_page;
#define SizeOfHashVacuumOnePage \
- (offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(int))
+ (offsetof(xl_hash_vacuum_one_page, logicalConflictPossible) +
sizeof(bool))
extern void hash_redo(XLogReaderState *record);
extern void hash_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 5c77290eec..76991d9d8f 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -245,10 +245,11 @@ typedef struct xl_heap_prune
TransactionId snapshotConflictHorizon;
uint16 nredirected;
uint16 ndead;
+ bool logicalConflictPossible;
/* OFFSET NUMBERS are in the block reference 0 */
} xl_heap_prune;
-#define SizeOfHeapPrune (offsetof(xl_heap_prune, ndead) + sizeof(uint16))
+#define SizeOfHeapPrune (offsetof(xl_heap_prune, logicalConflictPossible) +
sizeof(bool))
/*
* The vacuum page record is similar to the prune record, but can only mark
@@ -344,12 +345,13 @@ typedef struct xl_heap_freeze_page
{
TransactionId snapshotConflictHorizon;
uint16 nplans;
+ bool logicalConflictPossible;
/* FREEZE PLANS FOLLOW */
/* OFFSET NUMBER ARRAY FOLLOWS */
} xl_heap_freeze_page;
-#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, nplans) +
sizeof(uint16))
+#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page,
logicalConflictPossible) + sizeof(bool))
/*
* This is what we need to know about setting a visibility map bit
@@ -408,7 +410,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState
*record);
extern const char *heap2_identify(uint8 info);
extern void heap_xlog_logical_rewrite(XLogReaderState *r);
-extern XLogRecPtr log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
Buffer
vm_buffer,
TransactionId snapshotConflictHorizon,
uint8
vmflags);
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 3b2d959c69..c69e053869 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -188,9 +188,10 @@ typedef struct xl_btree_reuse_page
RelFileLocator locator;
BlockNumber block;
FullTransactionId snapshotConflictHorizon;
+ bool logicalConflictPossible;
} xl_btree_reuse_page;
-#define SizeOfBtreeReusePage (sizeof(xl_btree_reuse_page))
+#define SizeOfBtreeReusePage (offsetof(xl_btree_reuse_page,
logicalConflictPossible) + sizeof(bool))
/*
* xl_btree_vacuum and xl_btree_delete records describe deletion of index
@@ -235,13 +236,14 @@ typedef struct xl_btree_delete
TransactionId snapshotConflictHorizon;
uint16 ndeleted;
uint16 nupdated;
+ bool logicalConflictPossible;
/* DELETED TARGET OFFSET NUMBERS FOLLOW */
/* UPDATED TARGET OFFSET NUMBERS FOLLOW */
/* UPDATED TUPLES METADATA (xl_btree_update) ARRAY FOLLOWS */
} xl_btree_delete;
-#define SizeOfBtreeDelete (offsetof(xl_btree_delete, nupdated) +
sizeof(uint16))
+#define SizeOfBtreeDelete (offsetof(xl_btree_delete,
logicalConflictPossible) + sizeof(bool))
/*
* The offsets that appear in xl_btree_update metadata are offsets into the
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index 82332cb694..1d92af6b2d 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -240,6 +240,7 @@ typedef struct spgxlogVacuumRedirect
uint16 nToPlaceholder; /* number of redirects to make
placeholders */
OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */
TransactionId snapshotConflictHorizon; /* newest XID of removed
redirects */
+ bool logicalConflictPossible;
/* offsets of redirect tuples to make placeholders follow */
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
diff --git a/src/include/access/visibilitymapdefs.h
b/src/include/access/visibilitymapdefs.h
index 2803ef5c07..6005df3278 100644
--- a/src/include/access/visibilitymapdefs.h
+++ b/src/include/access/visibilitymapdefs.h
@@ -17,9 +17,10 @@
#define BITS_PER_HEAPBLOCK 2
/* Flags for bit map */
-#define VISIBILITYMAP_ALL_VISIBLE 0x01
-#define VISIBILITYMAP_ALL_FROZEN 0x02
-#define VISIBILITYMAP_VALID_BITS 0x03 /* OR of all valid visibilitymap
-
* flags bits */
+#define VISIBILITYMAP_ALL_VISIBLE
0x01
+#define VISIBILITYMAP_ALL_FROZEN
0x02
+#define VISIBILITYMAP_VALID_BITS
0x03 /* OR of all valid visibilitymap
+
* flags bits */
+#define VISIBILITYMAP_ON_CATALOG_ACCESSIBLE_IN_LOGICAL_DECODING 0x04
#endif /* VISIBILITYMAPDEFS_H
*/
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index f853846ee1..dd16431378 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -43,6 +43,8 @@ CATALOG(pg_index,2610,IndexRelationId) BKI_SCHEMA_MACRO
bool indcheckxmin; /* must we wait for xmin to be old? */
bool indisready; /* is this index ready for
inserts? */
bool indislive; /* is this index alive at all?
*/
+ bool indisusercatalog; /* is this index linked to a
user catalog
+ *
relation? */
bool indisreplident; /* is this index the identity for
replication? */
/* variable-length fields start here, but we allow direct access to
indkey */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index f383a2fca9..5d41ef6505 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -27,6 +27,7 @@
#include "storage/smgr.h"
#include "utils/relcache.h"
#include "utils/reltrigger.h"
+#include "catalog/catalog.h"
/*
@@ -343,6 +344,7 @@ typedef struct StdRdOptions
#define HEAP_MIN_FILLFACTOR 10
#define HEAP_DEFAULT_FILLFACTOR 100
+#define HEAP_DEFAULT_USER_CATALOG_TABLE false
/*
* RelationGetToastTupleTarget
@@ -385,6 +387,15 @@ typedef struct StdRdOptions
(relation)->rd_rel->relkind == RELKIND_MATVIEW) ? \
((StdRdOptions *) (relation)->rd_options)->user_catalog_table : false)
+/*
+ * IndexIsLinkedToUserCatalogTable
+ * Returns whether the relation should be treated as an index
linked to
+ * a user catalog table from the pov of logical decoding.
+ */
+#define IndexIsLinkedToUserCatalogTable(relation) \
+ ((relation)->rd_rel->relkind == RELKIND_INDEX && \
+ (relation)->rd_index->indisusercatalog)
+
/*
* RelationGetParallelWorkers
* Returns the relation's parallel_workers reloption setting.
@@ -682,7 +693,8 @@ RelationCloseSmgr(Relation relation)
#define RelationIsAccessibleInLogicalDecoding(relation) \
(XLogLogicalInfoActive() && \
RelationNeedsWAL(relation) && \
- (IsCatalogRelation(relation) ||
RelationIsUsedAsCatalogTable(relation)))
+ (IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)
|| \
+ IndexIsLinkedToUserCatalogTable(relation)))
/*
* RelationIsLogicallyLogged
--
2.34.1