Hi,
On Tue, Mar 24, 2026 at 11:50 PM Masahiko Sawada <[email protected]> wrote:
>
> > Please find the v3 patch for further review.
>
> Thank you for updating the patch. I think the patch is reasonably
> simple and can avoid unnecessary overheads well due to XID-based
> checks. Here are some comments:
Thank you for reviewing the patch.
> vacuum_get_cutoff() is also called by VACUUM FULL, CLUSTER, and
> REPACK. I'm not sure that users would expect the slot invalidation
> also in these commands. I think it's better to leave
> vacuum_get_cutoff() a pure cutoff computation function and we can try
> to invalidate slots in heap_vacuum_rel(). It requires additional
> ReadNextTransactionId() but we can live with it, or we can make
> vacuum_get_cutoffs() return the nextXID as well (stored in *cutoffs).
+1. I chose to perform the slot invalidation in heap_vacuum_rel by
getting the next txn ID and calling vacuum_get_cutoffs again when a
slot gets invalidated. IMHO, this is simple than adding a flag and do
the invalidation selectively in vacuum_get_cutoffs.
> if (TransactionIdPrecedes(oldestXmin, cutoffXID))
> + {
> + invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE,
> + 0,
> + InvalidOid,
> + InvalidTransactionId,
> + nextXID);
> + }
>
> I think it's better to check the procArray->replication_slot_xmin and
> procArray->replication_slot_catalog_xmin before iterating over each
> slot. Otherwise, we would end up checking every slot even when a long
> running transaction holds the oldestxmin back.
+1. Changed.
> + if (!TransactionIdIsNormal(cutoffXID))
> + cutoffXID = FirstNormalTransactionId;
>
> These codes have the same comment but are doing a slightly different
> thing. I guess the latter is missing '-'?
Fixed the typo.
I fixed a test error being reported in CI.
Please find the attached v4 patch for further review.
I've also attached the 0002 patch that adds a test case to demo a
production-like scenario by pushing the database to XID wraparound
limits and checking if the XID-age based invalidation with the GUC
setting at the default vacuum_failsafe_age of 1.6B works correctly,
and whether autovacuum can successfully remove this replication slot
blocker to proceed with freezing and bring the database back to
normal. I don't intend to get this committed unless others think
otherwise, but I wanted to have this as a reference.
--
Bharath Rupireddy
Amazon Web Services: https://aws.amazon.com
From 75a9c4562a0166f0d612a00a50597a40172adec8 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 25 Mar 2026 18:44:17 +0000
Subject: [PATCH v4 2/2] Add more tests for XID age slot invalidation
Consume XIDs up to wraparound WARNING limits with
max_slot_xid_age matching vacuum_failsafe_age (1.6B). Verify that
autovacuum invalidates the inactive replication slot
(XID-age-based invalidation), unblocks datfrozenxid advancement,
and prevents wraparound without any intervention.
---
src/test/recovery/Makefile | 3 +-
src/test/recovery/t/019_replslot_limit.pl | 162 ++++++++++++++++++++++
2 files changed, 164 insertions(+), 1 deletion(-)
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index d41aaaf8ae1..5c3d2c89941 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -12,7 +12,8 @@
EXTRA_INSTALL=contrib/pg_prewarm \
contrib/pg_stat_statements \
contrib/test_decoding \
- src/test/modules/injection_points
+ src/test/modules/injection_points \
+ src/test/modules/xid_wraparound
subdir = src/test/recovery
top_builddir = ../../..
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index c87657451bd..220bcfae3ee 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -703,4 +703,166 @@ $primary5->stop;
# GUC.
# =============================================================================
+# =============================================================================
+# Testcase: XID-age-based slot invalidation in a production-like scenario.
+# Standby sets slot xmin via HS feedback, disconnects, XIDs consumed.
+# Autovacuum automatically invalidates the slot once its xmin age exceeds
+# max_slot_xid_age, advances datfrozenxid in all databases, and keeps the
+# system healthy — no manual VACUUM, vacuumdb, or downtime needed.
+
+# Check if autovacuum has invalidated the slot due to xid_aged.
+# Returns 1 if invalidated, 0 otherwise. Early exit when max_slot_xid_age = 0.
+sub check_slot_invalidated
+{
+ my ($node, $slot_name, $max_age, $consumed_xids) = @_;
+
+ return 0 if $max_age == 0;
+
+ my $reason = $node->safe_psql('postgres',
+ "SELECT invalidation_reason FROM pg_replication_slots WHERE slot_name = '$slot_name'");
+ if ($reason eq 'xid_aged')
+ {
+ diag "Slot invalidated by autovacuum after consuming $consumed_xids XIDs";
+ return 1;
+ }
+ return 0;
+}
+
+# Verify server log shows slot invalidation by autovacuum worker with
+# correct xmin, age, and next txid values.
+sub verify_slot_xid_aged_invalidation
+{
+ my ($node, $slot_name, $slot_xmin, $max_age, $consumed_xids) = @_;
+
+ my $log = slurp_file($node->logfile);
+
+ # Verify the invalidation was performed by an autovacuum worker.
+ like($log,
+ qr/autovacuum worker\[\d+\] LOG:\s+invalidating obsolete replication slot "$slot_name"/,
+ "server log: $slot_name invalidated by autovacuum worker");
+
+ # Verify DETAIL shows the correct xmin and max_slot_xid_age.
+ like($log,
+ qr/autovacuum worker\[\d+\] DETAIL:\s+The slot's xmin $slot_xmin at next transaction ID (\d+) exceeds the age $max_age specified by "max_slot_xid_age"/,
+ "server log: DETAIL shows xmin $slot_xmin and age $max_age");
+
+ # Extract next txid from the log and report for diagnostics.
+ $log =~
+ /The slot's xmin $slot_xmin at next transaction ID (\d+) exceeds/;
+ my $log_next_txid = $1 // 'N/A';
+ diag "next_txid from server log=$log_next_txid, max_slot_xid_age=$max_age, consumed=$consumed_xids XIDs";
+}
+
+# Verify slot was invalidated and wait for autovacuum to advance datfrozenxid
+# in all databases. Early exit when max_slot_xid_age = 0.
+sub verify_invalidation_and_recovery
+{
+ my ($node, $slot_name, $slot_xmin, $max_age, $consumed_xids, $slot_gone) = @_;
+
+ return if $max_age == 0;
+
+ ok($slot_gone, 'autovacuum invalidated slot due to xid_aged');
+
+ verify_slot_xid_aged_invalidation($node, $slot_name,
+ $slot_xmin, $max_age, $consumed_xids);
+
+ # Wait for autovacuum to advance datfrozenxid in all databases past the
+ # wraparound danger zone — no manual intervention required.
+ $node->poll_query_until(
+ 'postgres', qq[
+ SELECT NOT EXISTS (
+ SELECT 1 FROM pg_database
+ WHERE age(datfrozenxid) > 2000000000
+ );
+ ]) or die "Timed out waiting for autovacuum to advance datfrozenxid in all databases";
+}
+
+my $primary6 = PostgreSQL::Test::Cluster->new('primary6');
+$primary6->init(allows_streaming => 'logical');
+
+$max_slot_xid_age = 1600000000; # matches vacuum_failsafe_age default
+$primary6->append_conf(
+ 'postgresql.conf', qq{
+max_slot_xid_age = $max_slot_xid_age
+autovacuum_naptime = 1s
+});
+
+$primary6->start;
+$primary6->safe_psql('postgres', "CREATE EXTENSION xid_wraparound");
+
+$backup_name = 'backup6';
+$primary6->backup($backup_name);
+
+my $standby6 = PostgreSQL::Test::Cluster->new('standby6');
+$standby6->init_from_backup($primary6, $backup_name, has_streaming => 1);
+$standby6->append_conf(
+ 'postgresql.conf', q{
+primary_slot_name = 'sb6_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+
+$primary6->safe_psql('postgres',
+ "SELECT pg_create_physical_replication_slot('sb6_slot', true)");
+
+$standby6->start;
+
+$primary6->safe_psql('postgres',
+ "CREATE TABLE tab_int6 AS SELECT generate_series(1,10) AS a");
+$primary6->wait_for_catchup($standby6);
+
+$primary6->poll_query_until(
+ 'postgres', qq[
+ SELECT xmin IS NOT NULL FROM pg_replication_slots
+ WHERE slot_name = 'sb6_slot';
+]) or die "Timed out waiting for sb6_slot xmin from HS feedback";
+
+$result = $primary6->safe_psql('postgres',
+ "SELECT xmin IS NOT NULL FROM pg_replication_slots WHERE slot_name = 'sb6_slot'");
+is($result, 't', 'slot has xmin from hot_standby_feedback');
+
+# Capture the slot's xmin for later log verification.
+my $slot_xmin = $primary6->safe_psql('postgres',
+ "SELECT xmin FROM pg_replication_slots WHERE slot_name = 'sb6_slot'");
+
+# Stop standby; slot xmin persists and holds back datfrozenxid.
+$standby6->stop;
+
+# Consume XIDs in 50M chunks. Once we exceed max_slot_xid_age, autovacuum
+# (naptime=1s) should automatically invalidate the slot. Keep consuming
+# until we see that happen — no manual VACUUM or downtime needed.
+my $logstart6 = -s $primary6->logfile;
+my $chunk = 50_000_000;
+my $max_xids = 2_200_000_000;
+my $consumed = 0;
+my $slot_gone = 0;
+
+while ($consumed < $max_xids)
+{
+ $primary6->safe_psql('postgres', "SELECT consume_xids($chunk)");
+ $consumed += $chunk;
+ my $remaining = $max_xids - $consumed;
+ diag "Consumed $consumed / $max_xids XIDs ($remaining remaining)";
+
+ if (!$slot_gone && check_slot_invalidated($primary6, 'sb6_slot',
+ $max_slot_xid_age, $consumed))
+ {
+ $slot_gone = 1;
+ }
+}
+
+verify_invalidation_and_recovery($primary6, 'sb6_slot',
+ $slot_xmin, $max_slot_xid_age, $consumed, $slot_gone);
+
+# Consume 1B more XIDs — combining with the 2.2B consumed above, the total
+# of 3.2B exceeds the 2^31 (~2.1B) usable XID space (xidStopLimit), i.e.
+# more than one full wraparound cycle, proving the system is healthy.
+$primary6->safe_psql('postgres', "SELECT consume_xids(1000000000)");
+ok(1, 'writes succeed after autovacuum invalidated the slot');
+
+$primary6->stop;
+
+# Testcase end: XID-age-based slot invalidation in a production-like scenario.
+# =============================================================================
+
done_testing();
--
2.47.3
From 84fa3816181a5817228b8d4690c484eb8f1650ba Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 25 Mar 2026 18:41:58 +0000
Subject: [PATCH v4 1/2] Add XID age based replication slot invalidation
Introduce max_slot_xid_age, a GUC that invalidates replication
slots whose xmin or catalog_xmin exceeds the specified age.
Disabled by default.
Idle or forgotten replication slots can hold back vacuum, leading
to bloat and eventually XID wraparound. In the worst case this
requires dropping the slot and single-user mode vacuuming. This
setting avoids that by proactively invalidating slots that have
fallen too far behind.
Invalidation checks are performed once per relation during vacuum
(both vacuum command and autovacuum).
---
doc/src/sgml/config.sgml | 29 +++
doc/src/sgml/system-views.sgml | 8 +
src/backend/access/heap/vacuumlazy.c | 18 ++
src/backend/access/transam/xlog.c | 5 +-
src/backend/replication/slot.c | 166 +++++++++++++++++-
src/backend/storage/ipc/standby.c | 3 +-
src/backend/utils/misc/guc_parameters.dat | 8 +
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/include/replication/slot.h | 10 +-
src/test/recovery/t/019_replslot_limit.pl | 163 +++++++++++++++++
10 files changed, 402 insertions(+), 10 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 8cdd826fbd3..e04b1384703 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4764,6 +4764,35 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
</listitem>
</varlistentry>
+ <varlistentry id="guc-max-slot-xid-age" xreflabel="max_slot_xid_age">
+ <term><varname>max_slot_xid_age</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_slot_xid_age</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Invalidate replication slots whose <literal>xmin</literal> (the oldest
+ transaction that this slot needs the database to retain) or
+ <literal>catalog_xmin</literal> (the oldest transaction affecting the
+ system catalogs that this slot needs the database to retain) has reached
+ the age specified by this setting. This invalidation check happens
+ during vacuum (both <command>VACUUM</command> command and autovacuum).
+ A value of zero (which is default) disables this feature. Users can set
+ this value anywhere from zero to two billion. This parameter can only be
+ set in the <filename>postgresql.conf</filename> file or on the server
+ command line.
+ </para>
+
+ <para>
+ Idle or forgotten replication slots can hold back vacuum, leading to
+ bloat and eventually transaction ID wraparound. This setting avoids
+ that by invalidating slots that have fallen too far behind.
+ See <xref linkend="routine-vacuuming"/> for more details.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
<term><varname>wal_sender_timeout</varname> (<type>integer</type>)
<indexterm>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 9ee1a2bfc6a..1a507b430f9 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -3102,6 +3102,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
<xref linkend="guc-idle-replication-slot-timeout"/> duration.
</para>
</listitem>
+ <listitem>
+ <para>
+ <literal>xid_aged</literal> means that the slot's
+ <literal>xmin</literal> or <literal>catalog_xmin</literal>
+ has reached the age specified by
+ <xref linkend="guc-max-slot-xid-age"/> parameter.
+ </para>
+ </listitem>
</itemizedlist>
</para></entry>
</row>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index f698c2d899b..07c897d5b27 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -147,6 +147,7 @@
#include "pgstat.h"
#include "portability/instr_time.h"
#include "postmaster/autovacuum.h"
+#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/latch.h"
@@ -799,6 +800,23 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
* to increase the number of dead tuples it can prune away.)
*/
vacrel->aggressive = vacuum_get_cutoffs(rel, params, &vacrel->cutoffs);
+
+ /*
+ * Try to invalidate XID-aged replication slots that may interfere with
+ * vacuum's ability to freeze and remove dead tuples. Since OldestXmin
+ * already covers the slot xmin/catalog_xmin values, pass it as a
+ * preliminary check to avoid additional iteration over all the slots.
+ *
+ * If at least one slot was invalidated, recompute cutoffs so that this
+ * vacuum benefits from the advanced horizon immediately.
+ *
+ * XXX: Next XID could be returned as output from vacuum_get_cutoffs() but
+ * for now we live with an additional ReadNextTransactionId() call.
+ */
+ if (InvalidateXIDAgedReplicationSlots(vacrel->cutoffs.OldestXmin,
+ ReadNextTransactionId()))
+ vacrel->aggressive = vacuum_get_cutoffs(rel, params, &vacrel->cutoffs);
+
vacrel->rel_pages = orig_rel_pages = RelationGetNumberOfBlocks(rel);
vacrel->vistest = GlobalVisTestFor(rel);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f5c9a34374d..d5d43620aad 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7443,7 +7443,7 @@ CreateCheckPoint(int flags)
KeepLogSeg(recptr, &_logSegNo);
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT,
_logSegNo, InvalidOid,
- InvalidTransactionId))
+ InvalidTransactionId, InvalidTransactionId))
{
/*
* Some slots have been invalidated; recalculate the old-segment
@@ -7900,7 +7900,7 @@ CreateRestartPoint(int flags)
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT,
_logSegNo, InvalidOid,
- InvalidTransactionId))
+ InvalidTransactionId, InvalidTransactionId))
{
/*
* Some slots have been invalidated; recalculate the old-segment
@@ -8764,6 +8764,7 @@ xlog_redo(XLogReaderState *record)
*/
InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
0, InvalidOid,
+ InvalidTransactionId,
InvalidTransactionId);
}
else if (sync_replication_slots)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a9092fc2382..8ca5d86fe0e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -117,6 +117,7 @@ static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
{RS_INVAL_HORIZON, "rows_removed"},
{RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
{RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
+ {RS_INVAL_XID_AGE, "xid_aged"},
};
/*
@@ -158,6 +159,12 @@ int max_replication_slots = 10; /* the maximum number of replication
*/
int idle_replication_slot_timeout_secs = 0;
+/*
+ * Invalidate replication slots that have xmin or catalog_xmin greater
+ * than the specified age; '0' disables it.
+ */
+int max_slot_xid_age = 0;
+
/*
* This GUC lists streaming replication standby server slot names that
* logical WAL sender processes will wait for.
@@ -176,6 +183,8 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
static void ReplicationSlotShmemExit(int code, Datum arg);
static bool IsSlotForConflictCheck(const char *name);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static bool IsReplicationSlotXIDAged(TransactionId xmin, TransactionId catalog_xmin,
+ TransactionId nextXID);
/* internal persistency functions */
static void RestoreSlotFromDisk(const char *name);
@@ -1780,7 +1789,10 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
XLogRecPtr restart_lsn,
XLogRecPtr oldestLSN,
TransactionId snapshotConflictHorizon,
- long slot_idle_seconds)
+ long slot_idle_seconds,
+ TransactionId xmin,
+ TransactionId catalog_xmin,
+ TransactionId nextXID)
{
StringInfoData err_detail;
StringInfoData err_hint;
@@ -1825,6 +1837,36 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
"idle_replication_slot_timeout");
break;
}
+
+ case RS_INVAL_XID_AGE:
+ {
+ Assert(TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin));
+
+ if (TransactionIdIsValid(xmin))
+ {
+ /* translator: %s is a GUC variable name */
+ appendStringInfo(&err_detail, _("The slot's xmin %u at next transaction ID %u exceeds the age %d specified by \"%s\"."),
+ xmin,
+ nextXID,
+ max_slot_xid_age,
+ "max_slot_xid_age");
+ }
+ else
+ {
+ /* translator: %s is a GUC variable name */
+ appendStringInfo(&err_detail, _("The slot's catalog xmin %u at next transaction ID %u exceeds the age %d specified by \"%s\"."),
+ catalog_xmin,
+ nextXID,
+ max_slot_xid_age,
+ "max_slot_xid_age");
+ }
+
+ /* translator: %s is a GUC variable name */
+ appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
+ "max_slot_xid_age");
+ break;
+ }
+
case RS_INVAL_NONE:
pg_unreachable();
}
@@ -1874,6 +1916,7 @@ static ReplicationSlotInvalidationCause
DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
XLogRecPtr oldestLSN, Oid dboid,
TransactionId snapshotConflictHorizon,
+ TransactionId nextXID,
TimestampTz *inactive_since, TimestampTz now)
{
Assert(possible_causes != RS_INVAL_NONE);
@@ -1945,6 +1988,11 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
}
}
+ /* Check if the slot needs to be invalidated due to max_slot_xid_age GUC */
+ if ((possible_causes & RS_INVAL_XID_AGE) &&
+ IsReplicationSlotXIDAged(s->data.xmin, s->data.catalog_xmin, nextXID))
+ return RS_INVAL_XID_AGE;
+
return RS_INVAL_NONE;
}
@@ -1967,6 +2015,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
ReplicationSlot *s,
XLogRecPtr oldestLSN,
Oid dboid, TransactionId snapshotConflictHorizon,
+ TransactionId nextXID,
bool *released_lock_out)
{
int last_signaled_pid = 0;
@@ -2019,6 +2068,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
s, oldestLSN,
dboid,
snapshotConflictHorizon,
+ nextXID,
&inactive_since,
now);
@@ -2112,7 +2162,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
ReportSlotInvalidation(invalidation_cause, true, active_pid,
slotname, restart_lsn,
oldestLSN, snapshotConflictHorizon,
- slot_idle_secs);
+ slot_idle_secs, s->data.xmin,
+ s->data.catalog_xmin, nextXID);
if (MyBackendType == B_STARTUP)
(void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
@@ -2165,7 +2216,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
ReportSlotInvalidation(invalidation_cause, false, active_pid,
slotname, restart_lsn,
oldestLSN, snapshotConflictHorizon,
- slot_idle_secs);
+ slot_idle_secs, s->data.xmin,
+ s->data.catalog_xmin, nextXID);
/* done with this slot for now */
break;
@@ -2192,6 +2244,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
* logical.
* - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
* "idle_replication_slot_timeout" duration.
+ * - RS_INVAL_XID_AGE: slot xid age is older than the configured
+ * "max_slot_xid_age" age.
*
* Note: This function attempts to invalidate the slot for multiple possible
* causes in a single pass, minimizing redundant iterations. The "cause"
@@ -2205,7 +2259,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
bool
InvalidateObsoleteReplicationSlots(uint32 possible_causes,
XLogSegNo oldestSegno, Oid dboid,
- TransactionId snapshotConflictHorizon)
+ TransactionId snapshotConflictHorizon, TransactionId nextXID)
{
XLogRecPtr oldestLSN;
bool invalidated = false;
@@ -2244,7 +2298,7 @@ restart:
if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
dboid, snapshotConflictHorizon,
- &released_lock))
+ nextXID, &released_lock))
{
Assert(released_lock);
@@ -3275,3 +3329,105 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
ConditionVariableCancelSleep();
}
+
+/*
+ * Check if the passed-in xmin or catalog_xmin have aged beyond the
+ * max_slot_xid_age GUC limit relative to nextXID.
+ *
+ * Returns true if either value exceeds the configured age.
+ */
+static bool
+IsReplicationSlotXIDAged(TransactionId xmin, TransactionId catalog_xmin,
+ TransactionId nextXID)
+{
+ TransactionId cutoffXID;
+ bool aged = false;
+
+ if (max_slot_xid_age == 0)
+ return false;
+
+ if (!TransactionIdIsNormal(nextXID))
+ return false;
+
+ cutoffXID = nextXID - max_slot_xid_age;
+
+ /* ensure it's a "normal" XID, else TransactionIdPrecedes misbehaves */
+ /* this can cause the limit to go backwards by 3, but that's OK */
+ if (cutoffXID < FirstNormalTransactionId)
+ cutoffXID -= FirstNormalTransactionId;
+
+ if (TransactionIdIsNormal(xmin) &&
+ TransactionIdPrecedes(xmin, cutoffXID))
+ aged = true;
+
+ if (TransactionIdIsNormal(catalog_xmin) &&
+ TransactionIdPrecedes(catalog_xmin, cutoffXID))
+ aged = true;
+
+ return aged;
+}
+
+/*
+ * Invalidate replication slots whose XID age exceeds the max_slot_xid_age
+ * GUC.
+ *
+ * The caller supplies oldestXmin, either computed via
+ * GetOldestNonRemovableTransactionId during vacuum, or computed via the
+ * minimum of slot xmin values obtained from ProcArrayGetReplicationSlotXmin,
+ * and nextXID, the next XID to be assigned used to compute the age.
+ *
+ * Preliminary checks based on the passed-in oldestXmin and the oldest slot
+ * xmin and catalog_xmin are done to avoid unnecessarily iterating over all
+ * the slots. If the oldestXmin age does not exceed the GUC then no
+ * individual slot can either, so the per-slot scan is skipped. For example,
+ * if oldestXmin is 100 and the GUC is 500, every slot's xmin must be >= 100,
+ * so none can be older than the GUC. Similarly, if the oldest slot xmin and
+ * catalog_xmin from ProcArray are not aged, the per-slot scan is skipped;
+ * this can happen when a long-running transaction holds the oldestXmin back.
+ *
+ * Even if the caller passes an oldestXmin that does not include the slot
+ * xmin/catalog_xmin range, there is no risk of incorrect invalidation: each
+ * slot's own xmin and catalog_xmin are individually verified against the GUC
+ * inside IsReplicationSlotXIDAged(). The only downside is an additional
+ * iteration over all the slots.
+ *
+ * Returns true if at least one slot was invalidated.
+ */
+bool
+InvalidateXIDAgedReplicationSlots(TransactionId oldestXmin, TransactionId nextXID)
+{
+ TransactionId cutoffXID;
+ bool invalidated = false;
+
+ if (max_slot_xid_age == 0)
+ return false;
+
+ if (!TransactionIdIsNormal(oldestXmin) || !TransactionIdIsNormal(nextXID))
+ return false;
+
+ cutoffXID = nextXID - max_slot_xid_age;
+
+ /* ensure it's a "normal" XID, else TransactionIdPrecedes misbehaves */
+ /* this can cause the limit to go backwards by 3, but that's OK */
+ if (!TransactionIdIsNormal(cutoffXID))
+ cutoffXID -= FirstNormalTransactionId;
+
+ if (TransactionIdPrecedes(oldestXmin, cutoffXID))
+ {
+ TransactionId slot_xmin;
+ TransactionId slot_catalog_xmin;
+
+ ProcArrayGetReplicationSlotXmin(&slot_xmin, &slot_catalog_xmin);
+
+ if (IsReplicationSlotXIDAged(slot_xmin, slot_catalog_xmin, nextXID))
+ {
+ invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE,
+ 0,
+ InvalidOid,
+ InvalidTransactionId,
+ nextXID);
+ }
+ }
+
+ return invalidated;
+}
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index de9092fdf5b..d60f39ec08e 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -504,7 +504,8 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
*/
if (IsLogicalDecodingEnabled() && isCatalogRel)
InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
- snapshotConflictHorizon);
+ snapshotConflictHorizon,
+ InvalidTransactionId);
}
/*
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 0c9854ad8fc..7fab02f1eeb 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -2049,6 +2049,14 @@
max => 'MAX_KILOBYTES',
},
+{ name => 'max_slot_xid_age', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_SENDING',
+ short_desc => 'Age of the transaction ID at which a replication slot gets invalidated.',
+ variable => 'max_slot_xid_age',
+ boot_val => '0',
+ min => '0',
+ max => '2000000000',
+},
+
# We use the hopefully-safely-small value of 100kB as the compiled-in
# default for max_stack_depth. InitializeGUCOptions will increase it
# if possible, depending on the actual platform-specific stack limit.
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e4abe6c0077..0f728d87b6c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -351,6 +351,8 @@
#wal_keep_size = 0 # in megabytes; 0 disables
#max_slot_wal_keep_size = -1 # in megabytes; -1 disables
#idle_replication_slot_timeout = 0 # in seconds; 0 disables
+#max_slot_xid_age = 0 # maximum XID age before a replication slot
+ # gets invalidated; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
#track_commit_timestamp = off # collect timestamp of transaction commit
# (change requires restart)
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 4b4709f6e2c..83cf8438724 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -66,10 +66,12 @@ typedef enum ReplicationSlotInvalidationCause
RS_INVAL_WAL_LEVEL = (1 << 2),
/* idle slot timeout has occurred */
RS_INVAL_IDLE_TIMEOUT = (1 << 3),
+ /* slot's xmin or catalog_xmin has reached max xid age */
+ RS_INVAL_XID_AGE = (1 << 4),
} ReplicationSlotInvalidationCause;
/* Maximum number of invalidation causes */
-#define RS_INVAL_MAX_CAUSES 4
+#define RS_INVAL_MAX_CAUSES 5
/*
* When the slot synchronization worker is running, or when
@@ -326,6 +328,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
extern PGDLLIMPORT int max_replication_slots;
extern PGDLLIMPORT char *synchronized_standby_slots;
extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
+extern PGDLLIMPORT int max_slot_xid_age;
/* shmem initialization functions */
extern Size ReplicationSlotsShmemSize(void);
@@ -367,7 +370,8 @@ extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
XLogSegNo oldestSegno,
Oid dboid,
- TransactionId snapshotConflictHorizon);
+ TransactionId snapshotConflictHorizon,
+ TransactionId nextXID);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
extern int ReplicationSlotIndex(ReplicationSlot *slot);
extern bool ReplicationSlotName(int index, Name name);
@@ -387,4 +391,6 @@ extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
+extern bool InvalidateXIDAgedReplicationSlots(TransactionId oldestXmin, TransactionId nextXID);
+
#endif /* SLOT_H */
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index 7b253e64d9c..c87657451bd 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -540,4 +540,167 @@ is( $publisher4->safe_psql(
$publisher4->stop;
$subscriber4->stop;
+# Advance XIDs, run VACUUM, and wait for a slot to be invalidated due to XID age.
+sub invalidate_slot_by_xid_age
+{
+ my ($node, $table_name, $slot_name, $slot_type, $nxids) = @_;
+
+ # Do some work to advance xids
+ $node->safe_psql(
+ 'postgres', qq[
+ do \$\$
+ begin
+ for i in 1..$nxids loop
+ -- use an exception block so that each iteration eats an XID
+ begin
+ insert into $table_name values (i);
+ exception
+ when division_by_zero then null;
+ end;
+ end loop;
+ end\$\$;
+ ]);
+
+ # Trigger slot invalidation via VACUUM
+ $node->safe_psql('postgres', "VACUUM");
+
+ # Wait for the replication slot to be invalidated due to XID age.
+ $node->poll_query_until(
+ 'postgres', qq[
+ SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+ WHERE slot_name = '$slot_name' AND
+ active = false AND
+ invalidation_reason = 'xid_aged';
+ ])
+ or die
+ "Timed out while waiting for slot $slot_name to be invalidated";
+
+ ok(1, "$slot_type replication slot invalidated due to XID age");
+}
+
+# =============================================================================
+# Testcase start: Invalidate streaming standby's slot due to max_slot_xid_age
+# GUC.
+
+# Initialize primary node for XID age tests
+my $primary5 = PostgreSQL::Test::Cluster->new('primary5');
+$primary5->init(allows_streaming => 'logical');
+
+# Configure primary with XID age settings
+my $max_slot_xid_age = 500;
+$primary5->append_conf(
+ 'postgresql.conf', qq{
+max_slot_xid_age = $max_slot_xid_age
+});
+
+$primary5->start;
+
+# Take a backup for creating standby
+$backup_name = 'backup5';
+$primary5->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby5 = PostgreSQL::Test::Cluster->new('standby5');
+$standby5->init_from_backup($primary5, $backup_name, has_streaming => 1);
+
+# Enable HS feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$standby5->append_conf(
+ 'postgresql.conf', q{
+primary_slot_name = 'sb5_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+
+$primary5->safe_psql(
+ 'postgres', qq[
+ SELECT pg_create_physical_replication_slot(slot_name := 'sb5_slot', immediately_reserve := true);
+]);
+
+$standby5->start;
+
+# Create some content on primary to move xmin
+$primary5->safe_psql('postgres',
+ "CREATE TABLE tab_int5 AS SELECT generate_series(1,10) AS a");
+
+# Wait until standby has replayed enough data
+$primary5->wait_for_catchup($standby5);
+
+# Wait for the slot to get xmin from hot_standby_feedback
+$primary5->poll_query_until(
+ 'postgres', qq[
+ SELECT xmin IS NOT NULL
+ FROM pg_catalog.pg_replication_slots
+ WHERE slot_name = 'sb5_slot';
+]) or die "Timed out waiting for slot sb5_slot xmin from HS feedback";
+
+# Stop standby to make the replication slot on primary inactive.
+# The slot's xmin persists and holds back datfrozenxid.
+$standby5->stop;
+
+# Advance XIDs and wait for the slot to be invalidated due to XID age.
+# Use 2x the max_slot_xid_age to ensure the slot's xmin age comfortably
+# exceeds the configured limit.
+invalidate_slot_by_xid_age($primary5, 'tab_int5', 'sb5_slot', 'physical',
+ 2 * $max_slot_xid_age);
+
+# Testcase end: Invalidate streaming standby's slot due to max_slot_xid_age
+# GUC.
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical subscriber's slot due to max_slot_xid_age
+# GUC.
+
+# Create a subscriber node
+my $subscriber5 = PostgreSQL::Test::Cluster->new('subscriber5');
+$subscriber5->init(allows_streaming => 'logical');
+$subscriber5->start;
+
+# Create tables on both primary and subscriber
+$primary5->safe_psql('postgres', "CREATE TABLE test_tbl5 (id int)");
+$subscriber5->safe_psql('postgres', "CREATE TABLE test_tbl5 (id int)");
+
+# Insert some initial data
+$primary5->safe_psql('postgres',
+ "INSERT INTO test_tbl5 VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $primary5_connstr = $primary5->connstr . ' dbname=postgres';
+$primary5->safe_psql('postgres',
+ "CREATE PUBLICATION pub5 FOR TABLE test_tbl5");
+
+$subscriber5->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub5 CONNECTION '$primary5_connstr' PUBLICATION pub5 WITH (slot_name = 'lsub5_slot')"
+);
+
+# Wait for initial sync to complete
+$subscriber5->wait_for_subscription_sync($primary5, 'sub5');
+
+$result = $subscriber5->safe_psql('postgres', "SELECT count(*) FROM test_tbl5");
+is($result, qq(5), "check initial copy was done for logical replication (XID age test)");
+
+# Wait for the logical slot to get catalog_xmin (logical slots use catalog_xmin, not xmin)
+$primary5->poll_query_until(
+ 'postgres', qq[
+ SELECT xmin IS NULL AND catalog_xmin IS NOT NULL
+ FROM pg_catalog.pg_replication_slots
+ WHERE slot_name = 'lsub5_slot';
+]) or die "Timed out waiting for slot lsub5_slot catalog_xmin to advance";
+
+# Stop subscriber to make the replication slot on primary inactive
+$subscriber5->stop;
+
+# Advance XIDs and wait for the slot to be invalidated due to XID age.
+# Use 2x the max_slot_xid_age to ensure the slot's catalog_xmin age
+# comfortably exceeds the configured limit.
+invalidate_slot_by_xid_age($primary5, 'test_tbl5', 'lsub5_slot', 'logical',
+ 2 * $max_slot_xid_age);
+
+$primary5->stop;
+
+# Testcase end: Invalidate logical subscriber's slot due to max_slot_xid_age
+# GUC.
+# =============================================================================
+
done_testing();
--
2.47.3