On Sat, Apr 6, 2024 at 5:10 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> Please see the attached v38 patch.

Hi, thanks everyone for reviewing the design and patches so far. Here
I'm with the v39 patches implementing inactive timeout based (0001)
and XID age based (0002) invalidation mechanisms.

I'm quoting the hackers who are okay with inactive timeout based
invalidation mechanism:
Bertrand Drouvot -
https://www.postgresql.org/message-id/ZgL0N%2BxVJNkyqsKL%40ip-10-97-1-34.eu-west-3.compute.internal
and 
https://www.postgresql.org/message-id/ZgPHDAlM79iLtGIH%40ip-10-97-1-34.eu-west-3.compute.internal
Amit Kapila - 
https://www.postgresql.org/message-id/CAA4eK1L3awyzWMuymLJUm8SoFEQe%3DDa9KUwCcAfC31RNJ1xdJA%40mail.gmail.com
Nathan Bossart -
https://www.postgresql.org/message-id/20240325195443.GA2923888%40nathanxps13
Robert Haas - 
https://www.postgresql.org/message-id/CA%2BTgmoZTbaaEjSZUG1FL0mzxAdN3qmXksO3O9_PZhEuXTkVnRQ%40mail.gmail.com

I'm quoting the hackers who are okay with XID age based invalidation mechanism:
Nathan Bossart -
https://www.postgresql.org/message-id/20240326150918.GB3181099%40nathanxps13
and https://www.postgresql.org/message-id/20240327150557.GA3994937%40nathanxps13
Alvaro Herrera -
https://www.postgresql.org/message-id/202403261539.xcjfle7sksz7%40alvherre.pgsql
Bertrand Drouvot -
https://www.postgresql.org/message-id/ZgPHDAlM79iLtGIH%40ip-10-97-1-34.eu-west-3.compute.internal
Amit Kapila - 
https://www.postgresql.org/message-id/CAA4eK1L3awyzWMuymLJUm8SoFEQe%3DDa9KUwCcAfC31RNJ1xdJA%40mail.gmail.com

There was a point raised by Robert
https://www.postgresql.org/message-id/CA%2BTgmoaRECcnyqxAxUhP5dk2S4HX%3DpGh-p-PkA3uc%2BjG_9hiMw%40mail.gmail.com
for XID age based invalidation. An issue related to
vacuum_defer_cleanup_age
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=be504a3e974d75be6f95c8f9b7367126034f2d12
led to the removal of the GUC
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=1118cd37eb61e6a2428f457a8b2026a7bb3f801a.
The same issue may not happen for the XID age based invaliation. This
is because the XID age is not calculated using FullTransactionId but
using TransactionId as the slot's xmin and catalog_xmin are tracked as
TransactionId.

There was a point raised by Amit
https://www.postgresql.org/message-id/CAA4eK1K8wqLsMw6j0hE_SFoWAeo3Kw8UNnMfhsWaYDF1GWYQ%2Bg%40mail.gmail.com
on when to do the XID age based invalidation - whether in checkpointer
or when vacuum is being run or whenever ComputeXIDHorizons gets called
or in autovacuum process. For now, I've chosen the design to do these
new invalidation checks in two places - 1) whenever the slot is
acquired and the slot acquisition errors out if invalidated, 2) during
checkpoint. However, I'm open to suggestions on this.

I've also verified the case whether the replication_slot_xid_age
setting can help in case of server inching towards the XID wraparound.
I've created a primary and streaming standby setup with
hot_standby_feedback set to on (so that the slot gets an xmin). Then,
I've set replication_slot_xid_age to 2 billion on the primary, and
used xid_wraparound extension to reach XID wraparound on the primary.
Once I start receiving the WARNINGs about VACUUM, I did a checkpoint
after which the slot got invalidated enabling my VACUUM to freeze XIDs
saving my database from XID wraparound problem.

Thanks a lot Masahiko Sawada for an offlist chat about the XID age
calculation logic.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From f3ba2562ba7d9c4f13e283740260025b8d1c9b0f Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 12 Apr 2024 14:52:35 +0000
Subject: [PATCH v39 1/2] Add inactive_timeout based replication slot
 invalidation.

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days, after which the inactive slots get invalidated.

To achieve the above, postgres introduces a GUC allowing users
set inactive timeout. The replication slots that are inactive
for longer than specified amount of time get invalidated.

The invalidation check happens at various locations to help being
as latest as possible, these locations include the following:
- Whenever the slot is acquired and the slot acquisition errors
out if invalidated.
- During checkpoint

Note that this new invalidation mechanism won't kick-in for the
slots that are currently being synced from the primary to the
standby, because such synced slots are typically considered not
active (for them to be later considered as inactive) as they don't
perform logical decoding to produce the changes.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CA%2BTgmoZTbaaEjSZUG1FL0mzxAdN3qmXksO3O9_PZhEuXTkVnRQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/202403260841.5jcv7ihniccy%40alvherre.pgsql
---
 doc/src/sgml/config.sgml                      |  33 ++
 doc/src/sgml/system-views.sgml                |   7 +
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |  11 +-
 src/backend/replication/slot.c                | 188 +++++++++++-
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/backend/utils/misc/guc_tables.c           |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |   6 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 286 ++++++++++++++++++
 13 files changed, 535 insertions(+), 20 deletions(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index d8e1282e12..a73677b98b 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4547,6 +4547,39 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-replication-slot-inactive-timeout" xreflabel="replication_slot_inactive_timeout">
+      <term><varname>replication_slot_inactive_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_slot_inactive_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidates replication slots that are inactive for longer than
+        specified amount of time. If this value is specified without units,
+        it is taken as seconds. A value of zero (which is default) disables
+        the timeout mechanism. This parameter can only be set in
+        the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+
+       <para>
+        This invalidation check happens either when the slot is acquired
+        for use or during a checkpoint. The time since the slot has become
+        inactive is known from its
+        <structfield>inactive_since</structfield> value using which the
+        timeout is measured.
+       </para>
+
+       <para>
+        Note that the inactive timeout invalidation mechanism is not
+        applicable for slots on the standby that are being synced from a
+        primary server (whose <structfield>synced</structfield> field is
+        <literal>true</literal>).
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 7ed617170f..063638beda 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2580,6 +2580,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>inactive_timeout</literal> means that the slot has been
+          inactive for the duration specified by
+          <xref linkend="guc-replication-slot-inactive-timeout"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..c47e56f78f 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -450,7 +450,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -653,6 +653,13 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 						   " name slot \"%s\" already exists on the standby",
 						   remote_slot->name));
 
+		/*
+		 * Skip the sync if the local slot is already invalidated. We do this
+		 * beforehand to avoid slot acquire and release.
+		 */
+		if (slot->data.invalidated != RS_INVAL_NONE)
+			return false;
+
 		/*
 		 * The slot has been synchronized before.
 		 *
@@ -669,7 +676,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cebf44bb0f..7cfbc2dfff 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -140,6 +141,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+int			replication_slot_inactive_timeout = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -159,6 +161,13 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
+static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+										   ReplicationSlot *s,
+										   XLogRecPtr oldestLSN,
+										   Oid dboid,
+										   TransactionId snapshotConflictHorizon,
+										   bool *invalidated);
+
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
@@ -535,12 +544,17 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if check_for_invalidation is true and the slot gets
+ * invalidated now or has been invalidated previously.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait,
+					   bool check_for_invalidation)
 {
 	ReplicationSlot *s;
 	int			active_pid;
+	bool		released_lock = false;
 
 	Assert(name != NULL);
 
@@ -615,6 +629,57 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	/*
+	 * Check if the acquired slot needs to be invalidated. And, error out if
+	 * it gets invalidated now or has been invalidated previously, because
+	 * there's no use in acquiring the invalidated slot.
+	 *
+	 * XXX: Currently we check for inactive_timeout invalidation here. We
+	 * might need to check for other invalidations too.
+	 */
+	if (check_for_invalidation)
+	{
+		bool		invalidated = false;
+
+		released_lock = InvalidatePossiblyObsoleteSlot(RS_INVAL_INACTIVE_TIMEOUT,
+													   s, 0, InvalidOid,
+													   InvalidTransactionId,
+													   &invalidated);
+
+		/*
+		 * If the slot has been invalidated, recalculate the resource limits.
+		 */
+		if (invalidated)
+		{
+			ReplicationSlotsComputeRequiredXmin(false);
+			ReplicationSlotsComputeRequiredLSN();
+		}
+
+		if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT)
+		{
+			/*
+			 * Release the lock if it's not yet to keep the cleanup path on
+			 * error happy.
+			 */
+			if (!released_lock)
+				LWLockRelease(ReplicationSlotControlLock);
+
+			Assert(s->inactive_since > 0);
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("can no longer get changes from replication slot \"%s\"",
+							NameStr(s->data.name)),
+					 errdetail("This slot has been invalidated because it was inactive since %s for more than replication_slot_inactive_timeout = %d seconds.",
+							   timestamptz_to_str(s->inactive_since),
+							   replication_slot_inactive_timeout)));
+		}
+	}
+
+	if (!released_lock)
+		LWLockRelease(ReplicationSlotControlLock);
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -781,7 +846,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -804,7 +869,7 @@ ReplicationSlotAlter(const char *name, bool failover)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1476,7 +1541,8 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   NameData slotname,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
-					   TransactionId snapshotConflictHorizon)
+					   TransactionId snapshotConflictHorizon,
+					   TimestampTz inactive_since)
 {
 	StringInfoData err_detail;
 	bool		hint = false;
@@ -1506,6 +1572,13 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			Assert(inactive_since > 0);
+			appendStringInfo(&err_detail,
+							 _("The slot has been inactive since %s for more than replication_slot_inactive_timeout = %d seconds."),
+							 timestamptz_to_str(inactive_since),
+							 replication_slot_inactive_timeout);
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1549,6 +1622,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
+	TimestampTz inactive_since = 0;
 
 	for (;;)
 	{
@@ -1556,6 +1630,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		NameData	slotname;
 		int			active_pid = 0;
 		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
+		TimestampTz now = 0;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1566,6 +1641,18 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			break;
 		}
 
+		if (cause == RS_INVAL_INACTIVE_TIMEOUT &&
+			(replication_slot_inactive_timeout > 0 &&
+			 s->inactive_since > 0 &&
+			 !(RecoveryInProgress() && s->data.synced)))
+		{
+			/*
+			 * We get the current time beforehand to avoid system call while
+			 * holding the spinlock.
+			 */
+			now = GetCurrentTimestamp();
+		}
+
 		/*
 		 * Check if the slot needs to be invalidated. If it needs to be
 		 * invalidated, and is not currently acquired, acquire it and mark it
@@ -1619,6 +1706,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+
+					/*
+					 * Quick exit if inactive timeout invalidation mechanism
+					 * is disabled or slot is currently being used or the slot
+					 * on standby is currently being synced from the primary.
+					 *
+					 * Note that we don't invalidate synced slots because,
+					 * they are typically considered not active as they don't
+					 * perform logical decoding to produce the changes.
+					 */
+					if (replication_slot_inactive_timeout == 0 ||
+						s->inactive_since == 0 ||
+						(RecoveryInProgress() && s->data.synced))
+						break;
+
+					/*
+					 * Check if the slot needs to be invalidated due to
+					 * replication_slot_inactive_timeout GUC.
+					 */
+					if (TimestampDifferenceExceeds(s->inactive_since, now,
+												   replication_slot_inactive_timeout * 1000))
+					{
+						invalidation_cause = cause;
+						inactive_since = s->inactive_since;
+
+						/*
+						 * Invalidation due to inactive timeout implies that
+						 * no one is using the slot.
+						 */
+						Assert(s->active_pid == 0);
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1644,11 +1764,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		active_pid = s->active_pid;
 
 		/*
-		 * If the slot can be acquired, do so and mark it invalidated
-		 * immediately.  Otherwise we'll signal the owning process, below, and
-		 * retry.
+		 * If the slot can be acquired, do so or if the slot is already ours,
+		 * then mark it invalidated.  Otherwise we'll signal the owning
+		 * process, below, and retry.
 		 */
-		if (active_pid == 0)
+		if (active_pid == 0 ||
+			(MyReplicationSlot != NULL &&
+			 MyReplicationSlot == s &&
+			 active_pid == MyProcPid))
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
@@ -1703,7 +1826,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			{
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
-									   oldestLSN, snapshotConflictHorizon);
+									   oldestLSN, snapshotConflictHorizon,
+									   inactive_since);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SendProcSignal(active_pid,
@@ -1749,7 +1873,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
-								   oldestLSN, snapshotConflictHorizon);
+								   oldestLSN, snapshotConflictHorizon,
+								   inactive_since);
 
 			/* done with this slot for now */
 			break;
@@ -1772,6 +1897,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1824,7 +1950,7 @@ restart:
 }
 
 /*
- * Flush all replication slots to disk.
+ * Flush all replication slots to disk and invalidate slots.
  *
  * It is convenient to flush dirty replication slots at the time of checkpoint.
  * Additionally, in case of a shutdown checkpoint, we also identify the slots
@@ -1835,6 +1961,7 @@ void
 CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
+	bool		invalidated = false;
 
 	elog(DEBUG1, "performing replication slot checkpoint");
 
@@ -1884,6 +2011,43 @@ CheckPointReplicationSlots(bool is_shutdown)
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	elog(DEBUG1, "performing replication slot invalidation");
+
+	/*
+	 * Note that we will make another pass over replication slots for
+	 * invalidations to keep the code simple. The assumption here is that the
+	 * traversal over replication slots isn't that costly even with hundreds
+	 * of replication slots. If it ever turns out that this assumption is
+	 * wrong, we might have to put the invalidation check logic in the above
+	 * loop, for that we might need to do the following:
+	 *
+	 * - Acqure ControlLock lock once before the loop.
+	 *
+	 * - Call InvalidatePossiblyObsoleteSlot for each slot.
+	 *
+	 * - Handle the cases in which ControlLock gets released just like
+	 * InvalidateObsoleteReplicationSlots does.
+	 *
+	 * - Avoid saving slot info to disk two times for each invalidated slot.
+	 *
+	 * XXX: Should we move inactive_timeout inavalidation check closer to
+	 * wal_removed in CreateCheckPoint and CreateRestartPoint?
+	 */
+	invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT,
+													 0,
+													 InvalidOid,
+													 InvalidTransactionId);
+
+	if (invalidated)
+	{
+		/*
+		 * If any slots have been invalidated, recalculate the resource
+		 * limits.
+		 */
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index dd6c1d5a7e..9ad3e55704 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -539,7 +539,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc40c454de..96eeb8b7d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -846,7 +846,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1459,7 +1459,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index c54b08fe18..82956d58d3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -299,7 +299,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, false);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c68fdc008b..79e7637ec9 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2982,6 +2982,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"replication_slot_inactive_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time to wait before invalidating an "
+						 "inactive replication slot."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&replication_slot_inactive_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 2166ea4a87..819310b0a7 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -335,6 +335,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#replication_slot_inactive_timeout = 0	# in seconds; 0 disables
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0c..8727b7b58b 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -230,6 +232,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *standby_slot_names;
+extern PGDLLIMPORT int replication_slot_inactive_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -245,7 +248,8 @@ extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, bool failover);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool check_for_invalidation);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..b4c5ce2875 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -10,6 +10,7 @@ tests += {
        'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
     },
     'tests': [
+      't/050_invalidate_slots.pl',
       't/001_stream_rep.pl',
       't/002_archiving.pl',
       't/003_recovery_targets.pl',
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..4663019c16
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,286 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# =============================================================================
+# Testcase start: Invalidate streaming standby's slot as well as logical
+# failover slot on primary due to replication_slot_inactive_timeout. Also,
+# check the logical failover slot synced on to the standby doesn't invalidate
+# the slot on its own, but gets the invalidated state from the remote slot on
+# the primary.
+
+# Initialize primary node
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Avoid checkpoint during the test, otherwise, the test can get unpredictable
+$primary->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+my $connstr_1 = $primary->connstr;
+$standby1->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+# Create sync slot on the primary
+$primary->psql('postgres',
+	q{SELECT pg_create_logical_replication_slot('lsub1_sync_slot', 'test_decoding', false, false, true);}
+);
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb1_slot', immediately_reserve := true);
+]);
+
+$standby1->start;
+
+my $standby1_logstart = -s $standby1->logfile;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# Synchronize the primary server slots to the standby
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slot is created on the standby and is
+# flagged as 'synced'.
+is( $standby1->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'lsub1_sync_slot' AND synced AND NOT temporary;}
+	),
+	"t",
+	'logical slot lsub1_sync_slot has synced as true on standby');
+
+my $logstart = -s $primary->logfile;
+my $inactive_timeout = 2;
+
+# Set timeout so that the next checkpoint will invalidate the inactive
+# replication slot.
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '${inactive_timeout}s';
+]);
+$primary->reload;
+
+# Wait for the logical failover slot to become inactive on the primary. Note
+# that nobody has acquired that slot yet, so due to
+# replication_slot_inactive_timeout setting above it must get invalidated.
+wait_for_slot_invalidation($primary, 'lsub1_sync_slot', $logstart,
+	$inactive_timeout);
+
+# Set timeout on the standby also to check the synced slots don't get
+# invalidated due to timeout on the standby.
+$standby1->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '2s';
+]);
+$standby1->reload;
+
+# Now, sync the logical failover slot from the remote slot on the primary.
+# Note that the remote slot has already been invalidated due to inactive
+# timeout. Now, the standby must also see it as invalidated.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Wait for the inactive replication slot to be invalidated.
+$standby1->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'lsub1_sync_slot' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for replication slot lsub1_sync_slot invalidation to be synced on standby";
+
+# Synced slot mustn't get invalidated on the standby even after a checkpoint,
+# it must sync invalidation from the primary. So, we must not see the slot's
+# invalidation message in server log.
+$standby1->safe_psql('postgres', "CHECKPOINT");
+ok( !$standby1->log_contains(
+		"invalidating obsolete replication slot \"lsub1_sync_slot\"",
+		$standby1_logstart),
+	'check that syned slot lsub1_sync_slot has not been invalidated on the standby'
+);
+
+# Stop standby to make the standby's replication slot on the primary inactive
+$standby1->stop;
+
+# Wait for the standby's replication slot to become inactive
+wait_for_slot_invalidation($primary, 'sb1_slot', $logstart,
+	$inactive_timeout);
+
+# Testcase end: Invalidate streaming standby's slot as well as logical failover
+# slot on primary due to replication_slot_inactive_timeout. Also, check the
+# logical failover slot synced on to the standby doesn't invalidate the slot on
+# its own, but gets the invalidated state from the remote slot on the primary.
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical subscriber's slot due to
+# replication_slot_inactive_timeout.
+
+my $publisher = $primary;
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '0';
+]);
+$publisher->reload;
+
+# Create subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('sub');
+$subscriber->init;
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$publisher->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_logical_replication_slot(slot_name := 'lsub1_slot', plugin := 'pgoutput');
+]);
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot', create_slot = false)"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub');
+
+my $result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO ' ${inactive_timeout}s';
+]);
+$publisher->reload;
+
+$logstart = -s $publisher->logfile;
+
+# Stop subscriber to make the replication slot on publisher inactive
+$subscriber->stop;
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# timeout.
+wait_for_slot_invalidation($publisher, 'lsub1_slot', $logstart,
+	$inactive_timeout);
+
+# Testcase end: Invalidate logical subscriber's slot due to
+# replication_slot_inactive_timeout.
+# =============================================================================
+
+sub wait_for_slot_invalidation
+{
+	my ($node, $slot_name, $offset, $inactive_timeout) = @_;
+	my $name = $node->name;
+
+	# Wait for the replication slot to become inactive
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND active = 'f';
+	])
+	  or die
+	  "Timed out while waiting for slot $slot_name to become inactive on node $name";
+
+	# Wait for the replication slot info to be updated
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE inactive_since IS NOT NULL
+				AND slot_name = '$slot_name' AND active = 'f';
+	])
+	  or die
+	  "Timed out while waiting for info of slot $slot_name to be updated on node $name";
+
+	# Sleep at least $inactive_timeout duration to avoid multiple checkpoints
+	# for the slot to get invalidated.
+	sleep($inactive_timeout);
+
+	check_for_slot_invalidation_in_server_log($node, $slot_name, $offset);
+
+	# Wait for the inactive replication slot to be invalidated
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND
+			invalidation_reason = 'inactive_timeout';
+	])
+	  or die
+	  "Timed out while waiting for inactive slot $slot_name to be invalidated on node $name";
+
+	# Check that the invalidated slot cannot be acquired
+	my ($result, $stdout, $stderr);
+
+	($result, $stdout, $stderr) = $node->psql(
+		'postgres', qq[
+			SELECT pg_replication_slot_advance('$slot_name', '0/1');
+	]);
+
+	ok( $stderr =~
+		  /can no longer get changes from replication slot "$slot_name"/,
+		"detected error upon trying to acquire invalidated slot $slot_name on node $name"
+	  )
+	  or die
+	  "could not detect error upon trying to acquire invalidated slot $slot_name on node $name";
+}
+
+# Check for invalidation of slot in server log
+sub check_for_slot_invalidation_in_server_log
+{
+	my ($node, $slot_name, $offset) = @_;
+	my $name = $node->name;
+	my $invalidated = 0;
+
+	for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+	{
+		$node->safe_psql('postgres', "CHECKPOINT");
+		if ($node->log_contains(
+				"invalidating obsolete replication slot \"$slot_name\"",
+				$offset))
+		{
+			$invalidated = 1;
+			last;
+		}
+		usleep(100_000);
+	}
+	ok($invalidated,
+		"check that slot $slot_name invalidation has been logged on node $name"
+	);
+}
+
+done_testing();
-- 
2.34.1

From c6cee7b246583c05e55b1ed5b14d4d786c2d8ddd Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 12 Apr 2024 15:00:05 +0000
Subject: [PATCH v39 2/2] Add XID age based replication slot invalidation.

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set an XID age (age of
slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which
the slots get invalidated.

To achieve the above, postgres introduces a GUC allowing users
set slot XID age. The replication slots whose xmin or catalog_xmin
has reached the age specified by this setting get invalidated.

The invalidation check happens at various locations to help being
as latest as possible, these locations include the following:
- Whenever the slot is acquired and the slot acquisition errors
out if invalidated.
- During checkpoint

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/20240327150557.GA3994937%40nathanxps13
Discussion: https://www.postgresql.org/message-id/CA%2BTgmoaRECcnyqxAxUhP5dk2S4HX%3DpGh-p-PkA3uc%2BjG_9hiMw%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  26 ++
 doc/src/sgml/system-views.sgml                |   8 +
 src/backend/replication/slot.c                | 160 +++++++++-
 src/backend/utils/misc/guc_tables.c           |  10 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |   3 +
 src/test/recovery/t/050_invalidate_slots.pl   | 296 +++++++++++++++++-
 7 files changed, 490 insertions(+), 14 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index a73677b98b..f7aee4663f 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4580,6 +4580,32 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-replication-slot-xid-age" xreflabel="replication_slot_xid_age">
+      <term><varname>replication_slot_xid_age</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_slot_xid_age</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots whose <literal>xmin</literal> (the oldest
+        transaction that this slot needs the database to retain) or
+        <literal>catalog_xmin</literal> (the oldest transaction affecting the
+        system catalogs that this slot needs the database to retain) has reached
+        the age specified by this setting. A value of zero (which is default)
+        disables this feature. Users can set this value anywhere from zero to
+        two billion. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server command
+        line.
+       </para>
+
+       <para>
+        This invalidation check happens either when the slot is acquired
+        for use or during a checkpoint.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 063638beda..05a11a0fe3 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2587,6 +2587,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           <xref linkend="guc-replication-slot-inactive-timeout"/> parameter.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>xid_aged</literal> means that the slot's
+          <literal>xmin</literal> or <literal>catalog_xmin</literal>
+          has reached the age specified by
+          <xref linkend="guc-replication-slot-xid-age"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 7cfbc2dfff..2029efe5a6 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -108,10 +108,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
 	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
+	[RS_INVAL_XID_AGE] = "xid_aged",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -142,6 +143,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 int			replication_slot_inactive_timeout = 0;
+int			replication_slot_xid_age = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -160,6 +162,9 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static bool ReplicationSlotIsXIDAged(ReplicationSlot *slot,
+									 TransactionId *xmin,
+									 TransactionId *catalog_xmin);
 
 static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 										   ReplicationSlot *s,
@@ -636,8 +641,8 @@ retry:
 	 * it gets invalidated now or has been invalidated previously, because
 	 * there's no use in acquiring the invalidated slot.
 	 *
-	 * XXX: Currently we check for inactive_timeout invalidation here. We
-	 * might need to check for other invalidations too.
+	 * XXX: Currently we check for inactive_timeout and xid_aged invalidations
+	 * here. We might need to check for other invalidations too.
 	 */
 	if (check_for_invalidation)
 	{
@@ -648,6 +653,22 @@ retry:
 													   InvalidTransactionId,
 													   &invalidated);
 
+		if (!invalidated && released_lock)
+		{
+			/* The slot is still ours */
+			Assert(s->active_pid == MyProcPid);
+
+			/* Reacquire the ControlLock */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+			released_lock = false;
+		}
+
+		if (!invalidated)
+			released_lock = InvalidatePossiblyObsoleteSlot(RS_INVAL_XID_AGE,
+														   s, 0, InvalidOid,
+														   InvalidTransactionId,
+														   &invalidated);
+
 		/*
 		 * If the slot has been invalidated, recalculate the resource limits.
 		 */
@@ -657,7 +678,8 @@ retry:
 			ReplicationSlotsComputeRequiredLSN();
 		}
 
-		if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT)
+		if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT ||
+			s->data.invalidated == RS_INVAL_XID_AGE)
 		{
 			/*
 			 * Release the lock if it's not yet to keep the cleanup path on
@@ -665,7 +687,10 @@ retry:
 			 */
 			if (!released_lock)
 				LWLockRelease(ReplicationSlotControlLock);
+		}
 
+		if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT)
+		{
 			Assert(s->inactive_since > 0);
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -675,6 +700,20 @@ retry:
 							   timestamptz_to_str(s->inactive_since),
 							   replication_slot_inactive_timeout)));
 		}
+
+		if (s->data.invalidated == RS_INVAL_XID_AGE)
+		{
+			Assert(TransactionIdIsValid(s->data.xmin) ||
+				   TransactionIdIsValid(s->data.catalog_xmin));
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("can no longer get changes from replication slot \"%s\"",
+							NameStr(s->data.name)),
+					 errdetail("The slot's xmin %u or catalog_xmin %u has reached the age %d specified by replication_slot_xid_age.",
+							   s->data.xmin,
+							   s->data.catalog_xmin,
+							   replication_slot_xid_age)));
+		}
 	}
 
 	if (!released_lock)
@@ -1542,7 +1581,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
 					   TransactionId snapshotConflictHorizon,
-					   TimestampTz inactive_since)
+					   TimestampTz inactive_since,
+					   TransactionId xmin,
+					   TransactionId catalog_xmin)
 {
 	StringInfoData err_detail;
 	bool		hint = false;
@@ -1579,6 +1620,27 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 							 timestamptz_to_str(inactive_since),
 							 replication_slot_inactive_timeout);
 			break;
+		case RS_INVAL_XID_AGE:
+			Assert(TransactionIdIsValid(xmin) ||
+				   TransactionIdIsValid(catalog_xmin));
+
+			if (TransactionIdIsValid(xmin))
+			{
+				appendStringInfo(&err_detail, _("The slot's xmin %u has reached the age %d specified by replication_slot_xid_age."),
+								 xmin,
+								 replication_slot_xid_age);
+				break;
+			}
+
+			if (TransactionIdIsValid(catalog_xmin))
+			{
+				appendStringInfo(&err_detail, _("The slot's catalog_xmin %u has reached the age %d specified by replication_slot_xid_age."),
+								 catalog_xmin,
+								 replication_slot_xid_age);
+				break;
+			}
+
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1623,6 +1685,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
 	TimestampTz inactive_since = 0;
+	TransactionId aged_xmin = InvalidTransactionId;
+	TransactionId aged_catalog_xmin = InvalidTransactionId;
 
 	for (;;)
 	{
@@ -1739,6 +1803,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 						Assert(s->active_pid == 0);
 					}
 					break;
+				case RS_INVAL_XID_AGE:
+					if (ReplicationSlotIsXIDAged(s, &aged_xmin, &aged_catalog_xmin))
+					{
+						Assert(TransactionIdIsValid(aged_xmin) ||
+							   TransactionIdIsValid(aged_catalog_xmin));
+
+						invalidation_cause = cause;
+						break;
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1827,7 +1901,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
 									   oldestLSN, snapshotConflictHorizon,
-									   inactive_since);
+									   inactive_since, aged_xmin,
+									   aged_catalog_xmin);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SendProcSignal(active_pid,
@@ -1874,7 +1949,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
 								   oldestLSN, snapshotConflictHorizon,
-								   inactive_since);
+								   inactive_since, aged_xmin,
+								   aged_catalog_xmin);
 
 			/* done with this slot for now */
 			break;
@@ -1898,6 +1974,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
  * - RS_INVAL_INACTIVE_TIMEOUT: inactive timeout occurs
+ * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -2031,14 +2108,20 @@ CheckPointReplicationSlots(bool is_shutdown)
 	 *
 	 * - Avoid saving slot info to disk two times for each invalidated slot.
 	 *
-	 * XXX: Should we move inactive_timeout inavalidation check closer to
-	 * wal_removed in CreateCheckPoint and CreateRestartPoint?
+	 * XXX: Should we move inactive_timeout and xid_aged inavalidation checks
+	 * closer to wal_removed in CreateCheckPoint and CreateRestartPoint?
 	 */
 	invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT,
 													 0,
 													 InvalidOid,
 													 InvalidTransactionId);
 
+	if (!invalidated)
+		invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE,
+														 0,
+														 InvalidOid,
+														 InvalidTransactionId);
+
 	if (invalidated)
 	{
 		/*
@@ -2050,6 +2133,65 @@ CheckPointReplicationSlots(bool is_shutdown)
 	}
 }
 
+/*
+ * Returns true if the given replication slot's xmin or catalog_xmin age is
+ * more than replication_slot_xid_age.
+ *
+ * Note that the caller must hold the replication slot's spinlock to avoid
+ * race conditions while this function reads xmin and catalog_xmin.
+ */
+static bool
+ReplicationSlotIsXIDAged(ReplicationSlot *slot, TransactionId *xmin,
+						 TransactionId *catalog_xmin)
+{
+	TransactionId cutoff;
+	TransactionId curr;
+
+	if (replication_slot_xid_age == 0)
+		return false;
+
+	curr = ReadNextTransactionId();
+
+	/*
+	 * Replication slot's xmin and catalog_xmin can never be larger than the
+	 * current transaction id even in the case of transaction ID wraparound.
+	 */
+	Assert(slot->data.xmin <= curr);
+	Assert(slot->data.catalog_xmin <= curr);
+
+	/*
+	 * The cutoff can tell how far we can go back from the current transaction
+	 * id till the age. And then, we check whether or not the xmin or
+	 * catalog_xmin falls within the cutoff; if yes, return true, otherwise
+	 * false.
+	 */
+	cutoff = curr - replication_slot_xid_age;
+
+	if (!TransactionIdIsNormal(cutoff))
+	{
+		cutoff = FirstNormalTransactionId;
+	}
+
+	*xmin = InvalidTransactionId;
+	*catalog_xmin = InvalidTransactionId;
+
+	if (TransactionIdIsNormal(slot->data.xmin) &&
+		TransactionIdPrecedesOrEquals(slot->data.xmin, cutoff))
+	{
+		*xmin = slot->data.xmin;
+		return true;
+	}
+
+	if (TransactionIdIsNormal(slot->data.catalog_xmin) &&
+		TransactionIdPrecedesOrEquals(slot->data.catalog_xmin, cutoff))
+	{
+		*catalog_xmin = slot->data.catalog_xmin;
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * Load all replication slots from disk into memory at server startup. This
  * needs to be run before we start crash recovery.
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 79e7637ec9..ea70e83350 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2994,6 +2994,16 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"replication_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."),
+			gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.")
+		},
+		&replication_slot_xid_age,
+		0, 0, 2000000000,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 819310b0a7..a2387ebd33 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -336,6 +336,7 @@
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
 #replication_slot_inactive_timeout = 0	# in seconds; 0 disables
+#replication_slot_xid_age = 0
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8727b7b58b..19e5dbfb36 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -55,6 +55,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_WAL_LEVEL,
 	/* inactive slot timeout has occurred */
 	RS_INVAL_INACTIVE_TIMEOUT,
+	/* slot's xmin or catalog_xmin has reached the age */
+	RS_INVAL_XID_AGE,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -233,6 +235,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *standby_slot_names;
 extern PGDLLIMPORT int replication_slot_inactive_timeout;
+extern PGDLLIMPORT int replication_slot_xid_age;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
index 4663019c16..da05350df4 100644
--- a/src/test/recovery/t/050_invalidate_slots.pl
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -89,7 +89,7 @@ $primary->reload;
 # that nobody has acquired that slot yet, so due to
 # replication_slot_inactive_timeout setting above it must get invalidated.
 wait_for_slot_invalidation($primary, 'lsub1_sync_slot', $logstart,
-	$inactive_timeout);
+	$inactive_timeout, 'inactive_timeout');
 
 # Set timeout on the standby also to check the synced slots don't get
 # invalidated due to timeout on the standby.
@@ -129,7 +129,7 @@ $standby1->stop;
 
 # Wait for the standby's replication slot to become inactive
 wait_for_slot_invalidation($primary, 'sb1_slot', $logstart,
-	$inactive_timeout);
+	$inactive_timeout, 'inactive_timeout');
 
 # Testcase end: Invalidate streaming standby's slot as well as logical failover
 # slot on primary due to replication_slot_inactive_timeout. Also, check the
@@ -197,15 +197,280 @@ $subscriber->stop;
 # Wait for the replication slot to become inactive and then invalidated due to
 # timeout.
 wait_for_slot_invalidation($publisher, 'lsub1_slot', $logstart,
-	$inactive_timeout);
+	$inactive_timeout, 'inactive_timeout');
 
 # Testcase end: Invalidate logical subscriber's slot due to
 # replication_slot_inactive_timeout.
 # =============================================================================
 
+# =============================================================================
+# Testcase start: Invalidate streaming standby's slot due to replication_slot_xid_age
+# GUC.
+
+# Prepare for the next test
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '0';
+]);
+$primary->reload;
+
+# Create a standby linking to the primary using the replication slot
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$standby2->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb2_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb2_slot', immediately_reserve := true);
+]);
+
+$standby2->start;
+
+# Create some content on primary to move xmin
+$primary->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a");
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby2);
+
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NOT NULL AND catalog_xmin IS NULL
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = 'sb2_slot';
+]) or die "Timed out waiting for slot sb2_slot xmin to advance";
+
+$primary->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET replication_slot_xid_age = 500;
+]);
+$primary->reload;
+
+# Stop standby to make the replication slot's xmin on primary to age
+$standby2->stop;
+
+$logstart = -s $primary->logfile;
+
+# Do some work to advance xids on primary
+advance_xids($primary, 'tab_int');
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# XID age.
+wait_for_slot_invalidation($primary, 'sb2_slot', $logstart, 0, 'xid_aged');
+
+# Testcase end: Invalidate streaming standby's slot due to replication_slot_xid_age
+# GUC.
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical subscriber's slot due to
+# replication_slot_xid_age GUC.
+
+$publisher = $primary;
+$publisher->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET replication_slot_xid_age = 500;
+]);
+$publisher->reload;
+
+$subscriber->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+));
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl2 (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl2 (id int)");
+
+# Insert some data
+$publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl2 VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+$publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub2 FOR TABLE test_tbl2");
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (slot_name = 'lsub2_slot')"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub2');
+
+$result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl2");
+
+is($result, qq(5), "check initial copy was done");
+
+$publisher->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NULL AND catalog_xmin IS NOT NULL
+	FROM pg_catalog.pg_replication_slots
+	WHERE slot_name = 'lsub2_slot';
+]) or die "Timed out waiting for slot lsub2_slot catalog_xmin to advance";
+
+$logstart = -s $publisher->logfile;
+
+# Stop subscriber to make the replication slot on publisher inactive
+$subscriber->stop;
+
+# Do some work to advance xids on publisher
+advance_xids($publisher, 'test_tbl2');
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# XID age.
+wait_for_slot_invalidation($publisher, 'lsub2_slot', $logstart, 0,
+	'xid_aged');
+
+# Testcase end: Invalidate logical subscriber's slot due to
+# replication_slot_xid_age GUC.
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical slot on standby that's being synced from
+# the primary due to replication_slot_xid_age GUC.
+
+$publisher = $primary;
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET replication_slot_xid_age = 0;
+]);
+$publisher->reload;
+
+# Create a standby linking to the primary using the replication slot
+my $standby3 = PostgreSQL::Test::Cluster->new('standby3');
+$standby3->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+$standby3->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb3_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb3_slot', immediately_reserve := true);
+]);
+
+$standby3->start;
+
+my $standby3_logstart = -s $standby3->logfile;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby3);
+
+$subscriber->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+));
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl3 (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl3 (id int)");
+
+# Insert some data
+$publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl3 VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+$publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub3 FOR TABLE test_tbl3");
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (slot_name = 'lsub3_sync_slot', failover = true)"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub3');
+
+$result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl3");
+
+is($result, qq(5), "check initial copy was done");
+
+$publisher->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NULL AND catalog_xmin IS NOT NULL
+	FROM pg_catalog.pg_replication_slots
+	WHERE slot_name = 'lsub3_sync_slot';
+])
+  or die "Timed out waiting for slot lsub3_sync_slot catalog_xmin to advance";
+
+# Synchronize the primary server slots to the standby
+$standby3->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slot is created on the standby and is
+# flagged as 'synced' and has got catalog_xmin from the primary.
+is( $standby3->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'lsub3_sync_slot' AND synced AND NOT temporary AND
+			xmin IS NULL AND catalog_xmin IS NOT NULL;}
+	),
+	"t",
+	'logical slot has synced as true on standby');
+
+my $primary_catalog_xmin = $primary->safe_psql('postgres',
+	"SELECT catalog_xmin FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND catalog_xmin IS NOT NULL;"
+);
+
+my $stabdby3_catalog_xmin = $standby3->safe_psql('postgres',
+	"SELECT catalog_xmin FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND catalog_xmin IS NOT NULL;"
+);
+
+is($primary_catalog_xmin, $stabdby3_catalog_xmin,
+	"check catalog_xmin are same for primary slot and synced slot");
+
+# Enable XID age based invalidation on the standby. Note that we disabled the
+# same on the primary to check if the invalidation occurs for synced slot on
+# the standby.
+$standby3->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET replication_slot_xid_age = 500;
+]);
+$standby3->reload;
+
+$logstart = -s $standby3->logfile;
+
+# Do some work to advance xids on primary
+advance_xids($primary, 'test_tbl3');
+
+# Wait for standby to catch up with the above work
+$primary->wait_for_catchup($standby3);
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# XID age.
+wait_for_slot_invalidation($standby3, 'lsub3_sync_slot', $logstart, 0,
+	'xid_aged');
+
+# Note that the replication slot on the primary is still active
+$result = $primary->safe_psql('postgres',
+	"SELECT COUNT(slot_name) = 1 FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND invalidation_reason IS NULL;"
+);
+
+is($result, 't', "check lsub3_sync_slot is still active on primary");
+
+# Testcase end: Invalidate logical slot on standby that's being synced from
+# the primary due to replication_slot_xid_age GUC.
+# =============================================================================
+
 sub wait_for_slot_invalidation
 {
-	my ($node, $slot_name, $offset, $inactive_timeout) = @_;
+	my ($node, $slot_name, $offset, $inactive_timeout, $reason) = @_;
 	my $name = $node->name;
 
 	# Wait for the replication slot to become inactive
@@ -238,7 +503,7 @@ sub wait_for_slot_invalidation
 		'postgres', qq[
 		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
 			WHERE slot_name = '$slot_name' AND
-			invalidation_reason = 'inactive_timeout';
+			invalidation_reason = '$reason';
 	])
 	  or die
 	  "Timed out while waiting for inactive slot $slot_name to be invalidated on node $name";
@@ -283,4 +548,25 @@ sub check_for_slot_invalidation_in_server_log
 	);
 }
 
+# Do some work for advancing xids on a given node
+sub advance_xids
+{
+	my ($node, $table_name) = @_;
+
+	$node->safe_psql(
+		'postgres', qq[
+		do \$\$
+		begin
+		for i in 10000..11000 loop
+			-- use an exception block so that each iteration eats an XID
+			begin
+			insert into $table_name values (i);
+			exception
+			when division_by_zero then null;
+			end;
+		end loop;
+		end\$\$;
+	]);
+}
+
 done_testing();
-- 
2.34.1

Reply via email to