On Mon, Mar 4, 2024 at 2:11 PM Bertrand Drouvot <bertranddrouvot...@gmail.com> wrote: > > On Sun, Mar 03, 2024 at 03:44:34PM -0600, Nathan Bossart wrote: > > On Sun, Mar 03, 2024 at 11:40:00PM +0530, Bharath Rupireddy wrote: > > > On Sat, Mar 2, 2024 at 3:41 AM Nathan Bossart <nathandboss...@gmail.com> > > > wrote: > > >> Would you ever see "conflict" as false and "invalidation_reason" as > > >> non-null for a logical slot? > > > > > > No. Because both conflict and invalidation_reason are decided based on > > > the invalidation reason i.e. value of slot_contents.data.invalidated. > > > IOW, a logical slot that reports conflict as true must have been > > > invalidated. > > > > > > Do you have any thoughts on reverting 007693f and introducing > > > invalidation_reason? > > > > Unless I am misinterpreting some details, ISTM we could rename this column > > to invalidation_reason and use it for both logical and physical slots. I'm > > not seeing a strong need for another column. > > Yeah having two columns was more for convenience purpose. Without the > "conflict" > one, a slot conflicting with recovery would be "a logical slot having a non > NULL > invalidation_reason". > > I'm also fine with one column if most of you prefer that way.
While we debate on the above, please find the attached v7 patch set after rebasing. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From 906f8829f7b6bf1da4b37edf2e4d5a46a7227400 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 5 Mar 2024 18:56:00 +0000 Subject: [PATCH v7 1/4] Track invalidation_reason in pg_replication_slots Currently the reason for replication slot invalidation is not tracked in pg_replication_slots. A recent commit 007693f2a added conflict_reason to show the reasons for slot invalidation, but only for logical slots. This commit adds invalidation_reason to pg_replication_slots to show invalidation reasons for both physical and logical slots. --- doc/src/sgml/system-views.sgml | 32 ++++++++++++++++++++++++++++ src/backend/catalog/system_views.sql | 3 ++- src/backend/replication/slotfuncs.c | 12 ++++++++--- src/include/catalog/pg_proc.dat | 6 +++--- src/test/regress/expected/rules.out | 5 +++-- 5 files changed, 49 insertions(+), 9 deletions(-) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index be90edd0e2..cce88c14bb 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2581,6 +2581,38 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>invalidation_reason</structfield> <type>text</type> + </para> + <para> + The reason for the slot's invalidation. <literal>NULL</literal> if the + slot is currently actively being used. The non-NULL values indicate that + the slot is marked as invalidated. Possible values are: + <itemizedlist spacing="compact"> + <listitem> + <para> + <literal>wal_removed</literal> means that the required WAL has been + removed. + </para> + </listitem> + <listitem> + <para> + <literal>rows_removed</literal> means that the required rows have + been removed. + </para> + </listitem> + <listitem> + <para> + <literal>wal_level_insufficient</literal> means that the + primary doesn't have a <xref linkend="guc-wal-level"/> sufficient to + perform logical decoding. + </para> + </listitem> + </itemizedlist> + </para></entry> + </row> + </tbody> </tgroup> </table> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 04227a72d1..c39f0d73d3 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1025,7 +1025,8 @@ CREATE VIEW pg_replication_slots AS L.two_phase, L.conflict_reason, L.failover, - L.synced + L.synced, + L.invalidation_reason FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 768a304723..a7a250b7c5 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 17 +#define PG_GET_REPLICATION_SLOTS_COLS 18 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -263,6 +263,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; WALAvailability walstate; int i; + ReplicationSlotInvalidationCause cause; if (!slot->in_use) continue; @@ -409,12 +410,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + cause = slot_contents.data.invalidated; + if (slot_contents.data.database == InvalidOid) nulls[i++] = true; else { - ReplicationSlotInvalidationCause cause = slot_contents.data.invalidated; - if (cause == RS_INVAL_NONE) nulls[i++] = true; else @@ -425,6 +426,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.synced); + if (cause == RS_INVAL_NONE) + nulls[i++] = true; + else + values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 291ed876fc..17eae8847b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11120,9 +11120,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool,text}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced,invalidation_reason}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 0cd2c64fca..e77bb36afe 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1475,8 +1475,9 @@ pg_replication_slots| SELECT l.slot_name, l.two_phase, l.conflict_reason, l.failover, - l.synced - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced) + l.synced, + l.invalidation_reason + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced, invalidation_reason) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.34.1
From af48594e7a99277219384f0da702f26a48527aa3 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 5 Mar 2024 18:57:25 +0000 Subject: [PATCH v7 2/4] Add XID based replication slot invalidation Currently postgres has the ability to invalidate inactive replication slots based on the amount of WAL (set via max_slot_wal_keep_size GUC) that will be needed for the slots in case they become active. However, choosing a default value for max_slot_wal_keep_size is tricky. Because the amount of WAL a customer generates, and their allocated storage will vary greatly in production, making it difficult to pin down a one-size-fits-all value. It is often easy for developers to set an XID age (age of slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which the slots get invalidated. To achieve the above, postgres uses replication slot xmin (the oldest transaction that this slot needs the database to retain) or catalog_xmin (the oldest transaction affecting the system catalogs that this slot needs the database to retain), and a new GUC max_slot_xid_age. The checkpointer then looks at all replication slots invalidating the slots based on the age set. --- doc/src/sgml/config.sgml | 21 ++++ src/backend/access/transam/xlog.c | 10 ++ src/backend/replication/slot.c | 44 ++++++- src/backend/utils/misc/guc_tables.c | 10 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/replication/slot.h | 3 + src/test/recovery/meson.build | 1 + src/test/recovery/t/050_invalidate_slots.pl | 108 ++++++++++++++++++ 8 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 src/test/recovery/t/050_invalidate_slots.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index b38cbd714a..7a8360cd32 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4544,6 +4544,27 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows </listitem> </varlistentry> + <varlistentry id="guc-max-slot-xid-age" xreflabel="max_slot_xid_age"> + <term><varname>max_slot_xid_age</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_slot_xid_age</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Invalidate replication slots whose <literal>xmin</literal> (the oldest + transaction that this slot needs the database to retain) or + <literal>catalog_xmin</literal> (the oldest transaction affecting the + system catalogs that this slot needs the database to retain) has reached + the age specified by this setting. A value of zero (which is default) + disables this feature. Users can set this value anywhere from zero to + two billion. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command + line. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp"> <term><varname>track_commit_timestamp</varname> (<type>boolean</type>) <indexterm> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 20a5f86209..36ae2ac6a4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7147,6 +7147,11 @@ CreateCheckPoint(int flags) if (PriorRedoPtr != InvalidXLogRecPtr) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); + /* Invalidate replication slots based on xmin or catalog_xmin age */ + if (max_slot_xid_age > 0) + InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0, + InvalidOid, InvalidTransactionId); + /* * Delete old log files, those no longer needed for last checkpoint to * prevent the disk holding the xlog from growing full. @@ -7597,6 +7602,11 @@ CreateRestartPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); + /* Invalidate replication slots based on xmin or catalog_xmin age */ + if (max_slot_xid_age > 0) + InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0, + InvalidOid, InvalidTransactionId); + /* * Retreat _logSegNo using the current end of xlog replayed or received, * whichever is later. diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2614f98ddd..febe57ff47 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -85,10 +85,11 @@ const char *const SlotInvalidationCauses[] = { [RS_INVAL_WAL_REMOVED] = "wal_removed", [RS_INVAL_HORIZON] = "rows_removed", [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", + [RS_INVAL_XID_AGE] = "xid_aged", }; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL +#define RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -118,6 +119,7 @@ ReplicationSlot *MyReplicationSlot = NULL; /* GUC variable */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +int max_slot_xid_age = 0; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -1446,6 +1448,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_WAL_LEVEL: appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server.")); break; + case RS_INVAL_XID_AGE: + appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age.")); + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1562,6 +1567,42 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, if (SlotIsLogical(s)) conflict = cause; break; + case RS_INVAL_XID_AGE: + { + TransactionId xid_cur = ReadNextTransactionId(); + TransactionId xid_limit; + TransactionId xid_slot; + + if (TransactionIdIsNormal(s->data.xmin)) + { + xid_slot = s->data.xmin; + + xid_limit = xid_slot + max_slot_xid_age; + if (xid_limit < FirstNormalTransactionId) + xid_limit += FirstNormalTransactionId; + + if (TransactionIdFollowsOrEquals(xid_cur, xid_limit)) + { + conflict = cause; + break; + } + } + if (TransactionIdIsNormal(s->data.catalog_xmin)) + { + xid_slot = s->data.catalog_xmin; + + xid_limit = xid_slot + max_slot_xid_age; + if (xid_limit < FirstNormalTransactionId) + xid_limit += FirstNormalTransactionId; + + if (TransactionIdFollowsOrEquals(xid_cur, xid_limit)) + { + conflict = cause; + break; + } + } + } + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1716,6 +1757,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations * - RS_INVAL_WAL_LEVEL: is logical + * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 45013582a7..3ed642dcaf 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2954,6 +2954,16 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."), + gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.") + }, + &max_slot_xid_age, + 0, 0, 2000000000, + NULL, NULL, NULL + }, + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index edcc0282b2..50019d7c25 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -334,6 +334,7 @@ #wal_sender_timeout = 60s # in milliseconds; 0 disables #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) +#max_slot_xid_age = 0 # - Primary Server - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index acbf567150..ad9fd1e94b 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_HORIZON, /* wal_level insufficient for slot */ RS_INVAL_WAL_LEVEL, + /* slot's xmin or catalog_xmin has reached the age */ + RS_INVAL_XID_AGE, } ReplicationSlotInvalidationCause; extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; @@ -226,6 +228,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT int max_slot_xid_age; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index c67249500e..d698c3ec73 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -50,6 +50,7 @@ tests += { 't/039_end_of_wal.pl', 't/040_standby_failover_slots_sync.pl', 't/041_checkpoint_at_promote.pl', + 't/050_invalidate_slots.pl', ], }, } diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl new file mode 100644 index 0000000000..2f482b56e8 --- /dev/null +++ b/src/test/recovery/t/050_invalidate_slots.pl @@ -0,0 +1,108 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Test for replication slots invalidation +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Utils; +use PostgreSQL::Test::Cluster; +use Test::More; +use Time::HiRes qw(usleep); + +# Initialize primary node, setting wal-segsize to 1MB +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 1, extra => ['--wal-segsize=1']); +$primary->append_conf( + 'postgresql.conf', q{ +checkpoint_timeout = 1h +}); +$primary->start; +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot('sb1_slot'); +]); + +# Take backup +my $backup_name = 'my_backup'; +$primary->backup($backup_name); + +# Create a standby linking to the primary using the replication slot +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup($primary, $backup_name, has_streaming => 1); + +# Enable hs_feedback. The slot should gain an xmin. We set the status interval +# so we'll see the results promptly. +$standby1->append_conf( + 'postgresql.conf', q{ +primary_slot_name = 'sb1_slot' +hot_standby_feedback = on +wal_receiver_status_interval = 1 +}); +$standby1->start; + +# Create some content on primary to move xmin +$primary->safe_psql('postgres', + "CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a"); + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby1); + +$primary->poll_query_until( + 'postgres', qq[ + SELECT xmin IS NOT NULL + FROM pg_catalog.pg_replication_slots + WHERE slot_name = 'sb1_slot'; +]) or die "Timed out waiting for slot xmin to advance"; + +$primary->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET max_slot_xid_age = 500; +]); +$primary->reload; + +# Stop standby to make the replication slot's xmin on primary to age +$standby1->stop; + +my $logstart = -s $primary->logfile; + +# Do some work to advance xmin +$primary->safe_psql( + 'postgres', q{ +do $$ +begin + for i in 10000..11000 loop + -- use an exception block so that each iteration eats an XID + begin + insert into tab_int values (i); + exception + when division_by_zero then null; + end; + end loop; +end$$; +}); + +my $invalidated = 0; +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $primary->safe_psql('postgres', "CHECKPOINT"); + if ($primary->log_contains( + 'invalidating obsolete replication slot "sb1_slot"', $logstart)) + { + $invalidated = 1; + last; + } + usleep(100_000); +} +ok($invalidated, 'check that slot sb1_slot invalidation has been logged'); + +# Wait for the inactive replication slots to be invalidated. +$primary->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' AND + invalidation_reason = 'xid_aged'; +]) + or die + "Timed out while waiting for replication slot sb1_slot to be invalidated"; + +done_testing(); -- 2.34.1
From c4501bcffd6149174245d16b00ca2571e46bb6cb Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 5 Mar 2024 18:57:42 +0000 Subject: [PATCH v7 3/4] Track inactive replication slot information Currently postgres doesn't track metrics like the time at which the slot became inactive, and the total number of times the slot became inactive in its lifetime. This commit adds two new metrics last_inactive_at of type timestamptz and inactive_count of type numeric to ReplicationSlotPersistentData. Whenever a slot becomes inactive, the current timestamp and inactive count are persisted to disk. These metrics are useful in the following ways: - To improve replication slot monitoring tools. For instance, one can build a monitoring tool that signals a) when replication slots is lying inactive for a day or so using last_inactive_at metric, b) when a replication slot is becoming inactive too frequently using last_inactive_at metric. - To implement timeout-based inactive replication slot management capability in postgres. Increases SLOT_VERSION due to the added two new metrics. --- doc/src/sgml/system-views.sgml | 20 +++++++++++++ src/backend/catalog/system_views.sql | 4 ++- src/backend/replication/slot.c | 43 ++++++++++++++++++++++------ src/backend/replication/slotfuncs.c | 15 +++++++++- src/include/catalog/pg_proc.dat | 6 ++-- src/include/replication/slot.h | 6 ++++ src/test/regress/expected/rules.out | 6 ++-- 7 files changed, 84 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index cce88c14bb..0dfd472b02 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2771,6 +2771,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx ID of role </para></entry> </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>last_inactive_at</structfield> <type>timestamptz</type> + </para> + <para> + The time at which the slot became inactive. + <literal>NULL</literal> if the slot is currently actively being + used. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>inactive_count</structfield> <type>numeric</type> + </para> + <para> + The total number of times the slot became inactive in its lifetime. + </para></entry> + </row> </tbody> </tgroup> </table> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c39f0d73d3..a5a78a9910 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1026,7 +1026,9 @@ CREATE VIEW pg_replication_slots AS L.conflict_reason, L.failover, L.synced, - L.invalidation_reason + L.invalidation_reason, + L.last_inactive_at, + L.inactive_count FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index febe57ff47..8066ea3b28 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -108,7 +108,7 @@ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 5 /* version for new files */ +#define SLOT_VERSION 6 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -363,6 +363,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; slot->data.synced = synced; + slot->data.last_inactive_at = 0; + slot->data.inactive_count = 0; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -589,6 +591,17 @@ retry: if (am_walsender) { + if (s->data.persistency == RS_PERSISTENT) + { + SpinLockAcquire(&s->mutex); + s->data.last_inactive_at = 0; + SpinLockRelease(&s->mutex); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + ereport(log_replication_commands ? LOG : DEBUG1, SlotIsLogical(s) ? errmsg("acquired logical replication slot \"%s\"", @@ -656,16 +669,20 @@ ReplicationSlotRelease(void) ConditionVariableBroadcast(&slot->active_cv); } - MyReplicationSlot = NULL; - - /* might not have been set when we've been a plain slot */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING; - ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; - LWLockRelease(ProcArrayLock); - if (am_walsender) { + if (slot->data.persistency == RS_PERSISTENT) + { + SpinLockAcquire(&slot->mutex); + slot->data.last_inactive_at = GetCurrentTimestamp(); + slot->data.inactive_count++; + SpinLockRelease(&slot->mutex); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + ereport(log_replication_commands ? LOG : DEBUG1, is_logical ? errmsg("released logical replication slot \"%s\"", @@ -675,6 +692,14 @@ ReplicationSlotRelease(void) pfree(slotname); } + + MyReplicationSlot = NULL; + + /* might not have been set when we've been a plain slot */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING; + ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + LWLockRelease(ProcArrayLock); } /* diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index a7a250b7c5..3bb4e9223e 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -239,10 +239,11 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 18 +#define PG_GET_REPLICATION_SLOTS_COLS 20 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; + char buf[256]; /* * We don't require any special permission to see this function's data @@ -431,6 +432,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]); + if (slot_contents.data.last_inactive_at > 0) + values[i++] = TimestampTzGetDatum(slot_contents.data.last_inactive_at); + else + nulls[i++] = true; + + /* Convert to numeric. */ + snprintf(buf, sizeof buf, UINT64_FORMAT, slot_contents.data.inactive_count); + values[i++] = DirectFunctionCall3(numeric_in, + CStringGetDatum(buf), + ObjectIdGetDatum(0), + Int32GetDatum(-1)); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 17eae8847b..681e329293 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11120,9 +11120,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced,invalidation_reason}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool,text,timestamptz,numeric}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced,invalidation_reason,last_inactive_at,inactive_count}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index ad9fd1e94b..83b47425ea 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -129,6 +129,12 @@ typedef struct ReplicationSlotPersistentData * for logical slots on the primary server. */ bool failover; + + /* When did this slot become inactive last time? */ + TimestampTz last_inactive_at; + + /* How many times the slot has been inactive? */ + uint64 inactive_count; } ReplicationSlotPersistentData; /* diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e77bb36afe..b451c324f9 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1476,8 +1476,10 @@ pg_replication_slots| SELECT l.slot_name, l.conflict_reason, l.failover, l.synced, - l.invalidation_reason - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced, invalidation_reason) + l.invalidation_reason, + l.last_inactive_at, + l.inactive_count + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced, invalidation_reason, last_inactive_at, inactive_count) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.34.1
From d29ac5e3004dc512300c9d07dc9d026ba979d066 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Tue, 5 Mar 2024 18:59:05 +0000 Subject: [PATCH v7 4/4] Add inactive_timeout based replication slot invalidation Currently postgres has the ability to invalidate inactive replication slots based on the amount of WAL (set via max_slot_wal_keep_size GUC) that will be needed for the slots in case they become active. However, choosing a default value for max_slot_wal_keep_size is tricky. Because the amount of WAL a customer generates, and their allocated storage will vary greatly in production, making it difficult to pin down a one-size-fits-all value. It is often easy for developers to set a timeout of say 1 or 2 or 3 days, after which the inactive slots get dropped. To achieve the above, postgres uses replication slot metric inactive_at (the time at which the slot became inactive), and a new GUC inactive_replication_slot_timeout. The checkpointer then looks at all replication slots invalidating the inactive slots based on the timeout set. --- doc/src/sgml/config.sgml | 18 +++++ src/backend/access/transam/xlog.c | 10 +++ src/backend/replication/slot.c | 22 +++++- src/backend/utils/misc/guc_tables.c | 12 +++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/replication/slot.h | 3 + src/test/recovery/t/050_invalidate_slots.pl | 79 +++++++++++++++++++ 7 files changed, 144 insertions(+), 1 deletion(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 7a8360cd32..f5c299ef73 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4565,6 +4565,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows </listitem> </varlistentry> + <varlistentry id="guc-inactive-replication-slot-timeout" xreflabel="inactive_replication_slot_timeout"> + <term><varname>inactive_replication_slot_timeout</varname> (<type>integer</type>) + <indexterm> + <primary><varname>inactive_replication_slot_timeout</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Invalidate replication slots that are inactive for longer than this + amount of time at the next checkpoint. If this value is specified + without units, it is taken as seconds. A value of zero (which is + default) disables the timeout mechanism. This parameter can only be + set in the <filename>postgresql.conf</filename> file or on the server + command line. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp"> <term><varname>track_commit_timestamp</varname> (<type>boolean</type>) <indexterm> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 36ae2ac6a4..166c3ed794 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7152,6 +7152,11 @@ CreateCheckPoint(int flags) InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0, InvalidOid, InvalidTransactionId); + /* Invalidate inactive replication slots based on timeout */ + if (inactive_replication_slot_timeout > 0) + InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0, + InvalidOid, InvalidTransactionId); + /* * Delete old log files, those no longer needed for last checkpoint to * prevent the disk holding the xlog from growing full. @@ -7607,6 +7612,11 @@ CreateRestartPoint(int flags) InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0, InvalidOid, InvalidTransactionId); + /* Invalidate inactive replication slots based on timeout */ + if (inactive_replication_slot_timeout > 0) + InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0, + InvalidOid, InvalidTransactionId); + /* * Retreat _logSegNo using the current end of xlog replayed or received, * whichever is later. diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 8066ea3b28..060cb7d66e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -86,10 +86,11 @@ const char *const SlotInvalidationCauses[] = { [RS_INVAL_HORIZON] = "rows_removed", [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", [RS_INVAL_XID_AGE] = "xid_aged", + [RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout", }; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE +#define RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -120,6 +121,7 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 10; /* the maximum number of replication * slots */ int max_slot_xid_age = 0; +int inactive_replication_slot_timeout = 0; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -1476,6 +1478,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_XID_AGE: appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age.")); break; + case RS_INVAL_INACTIVE_TIMEOUT: + appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by inactive_replication_slot_timeout.")); + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1628,6 +1633,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, } } break; + case RS_INVAL_INACTIVE_TIMEOUT: + if (s->data.last_inactive_at > 0) + { + TimestampTz now; + + Assert(s->data.persistency == RS_PERSISTENT); + Assert(s->active_pid == 0); + + now = GetCurrentTimestamp(); + if (TimestampDifferenceExceeds(s->data.last_inactive_at, now, + inactive_replication_slot_timeout * 1000)) + conflict = cause; + } + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1783,6 +1802,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * db; dboid may be InvalidOid for shared relations * - RS_INVAL_WAL_LEVEL: is logical * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age + * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 3ed642dcaf..06e3e87f4a 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2964,6 +2964,18 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"inactive_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the amount of time to wait before invalidating an " + "inactive replication slot."), + NULL, + GUC_UNIT_S + }, + &inactive_replication_slot_timeout, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 50019d7c25..092aaf1bec 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -261,6 +261,7 @@ #recovery_prefetch = try # prefetch pages referenced in the WAL? #wal_decode_buffer_size = 512kB # lookahead window used for prefetching # (change requires restart) +#inactive_replication_slot_timeout = 0 # in seconds; 0 disables # - Archiving - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 83b47425ea..708cbee324 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -55,6 +55,8 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, /* slot's xmin or catalog_xmin has reached the age */ RS_INVAL_XID_AGE, + /* inactive slot timeout has occurred */ + RS_INVAL_INACTIVE_TIMEOUT, } ReplicationSlotInvalidationCause; extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; @@ -235,6 +237,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; extern PGDLLIMPORT int max_slot_xid_age; +extern PGDLLIMPORT int inactive_replication_slot_timeout; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl index 2f482b56e8..4c66dd4a4e 100644 --- a/src/test/recovery/t/050_invalidate_slots.pl +++ b/src/test/recovery/t/050_invalidate_slots.pl @@ -105,4 +105,83 @@ $primary->poll_query_until( or die "Timed out while waiting for replication slot sb1_slot to be invalidated"; +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot('sb2_slot'); +]); + +$primary->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET max_slot_xid_age = 0; +]); +$primary->reload; + +# Create a standby linking to the primary using the replication slot +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($primary, $backup_name, has_streaming => 1); +$standby2->append_conf( + 'postgresql.conf', q{ +primary_slot_name = 'sb2_slot' +}); +$standby2->start; + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby2); + +# The inactive replication slot info should be null when the slot is active +my $result = $primary->safe_psql( + 'postgres', qq[ + SELECT last_inactive_at IS NULL, inactive_count = 0 AS OK + FROM pg_replication_slots WHERE slot_name = 'sb2_slot'; +]); +is($result, "t|t", + 'check the inactive replication slot info for an active slot'); + +# Set timeout so that the next checkpoint will invalidate the inactive +# replication slot. +$primary->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET inactive_replication_slot_timeout TO '1s'; +]); +$primary->reload; + +$logstart = -s $primary->logfile; + +# Stop standby to make the replication slot on primary inactive +$standby2->stop; + +# Wait for the inactive replication slot info to be updated +$primary->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE last_inactive_at IS NOT NULL AND + inactive_count = 1 AND slot_name = 'sb2_slot'; +]) + or die + "Timed out while waiting for inactive replication slot info to be updated"; + +$invalidated = 0; +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $primary->safe_psql('postgres', "CHECKPOINT"); + if ($primary->log_contains( + 'invalidating obsolete replication slot "sb2_slot"', $logstart)) + { + $invalidated = 1; + last; + } + usleep(100_000); +} +ok($invalidated, 'check that slot sb2_slot invalidation has been logged'); + +# Wait for the inactive replication slots to be invalidated. +$primary->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE slot_name = 'sb2_slot' AND + invalidation_reason = 'inactive_timeout'; +]) + or die + "Timed out while waiting for inactive replication slot sb2_slot to be invalidated"; + done_testing(); -- 2.34.1