From 239db578c1c84f264b60190078784a5b4f781c0b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 22 Mar 2024 21:07:46 +0000
Subject: [PATCH v15] Track last_inactive_time in pg_replication_slots.

Till now, the time at which the replication slot became inactive
is not tracked directly in pg_replication_slots. This commit adds
a new column 'last_inactive_time' for that for persistent slots.
It is set to 0 whenever a slot is made active/acquired and set to
current timestamp whenever the slot is inactive/released.

The new column will be useful on production servers to debug and
analyze inactive replication slots. It will also help to know the
lifetime of a replication slot - one can know how long a streaming
standby, logical subscriber, or replication slot consumer is down.

The new column will be useful to implement inactive timeout based
replication slot invalidation in a future commit.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 doc/src/sgml/system-views.sgml           |  11 +++
 src/backend/catalog/system_views.sql     |   3 +-
 src/backend/replication/slot.c           |  33 +++++++
 src/backend/replication/slotfuncs.c      |   7 +-
 src/include/catalog/pg_proc.dat          |   6 +-
 src/include/replication/slot.h           |   3 +
 src/test/recovery/meson.build            |   1 +
 src/test/recovery/t/043_replslot_misc.pl | 119 +++++++++++++++++++++++
 src/test/regress/expected/rules.out      |   5 +-
 9 files changed, 181 insertions(+), 7 deletions(-)
 create mode 100644 src/test/recovery/t/043_replslot_misc.pl

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index b5da476c20..2628aaa4db 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2592,6 +2592,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_inactive_time</structfield> <type>timestamptz</type>
+      </para>
+      <para>
+        The time at which the slot became inactive.
+        <literal>NULL</literal> if the slot is currently actively being
+        used or if this is a temporary replication slot.
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f69b7f5580..1bb350cc3c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1026,7 +1026,8 @@ CREATE VIEW pg_replication_slots AS
             L.conflicting,
             L.invalidation_reason,
             L.failover,
-            L.synced
+            L.synced,
+            L.last_inactive_time
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cdf0c450c5..643acc3f05 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
 	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+	slot->last_inactive_time = 0;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -622,6 +623,17 @@ retry:
 	if (SlotIsLogical(s))
 		pgstat_acquire_replslot(s);
 
+	/*
+	 * The slot is active by now, so reset the last inactive time. We don't
+	 * track the last inactive time for non-persistent slots.
+	 */
+	if (s->data.persistency == RS_PERSISTENT)
+	{
+		SpinLockAcquire(&s->mutex);
+		s->last_inactive_time = 0;
+		SpinLockRelease(&s->mutex);
+	}
+
 	if (am_walsender)
 	{
 		ereport(log_replication_commands ? LOG : DEBUG1,
@@ -681,12 +693,23 @@ ReplicationSlotRelease(void)
 
 	if (slot->data.persistency == RS_PERSISTENT)
 	{
+		TimestampTz now;
+
+		/*
+		 * Get current time beforehand to avoid system call while holding the
+		 * lock.
+		 */
+		now = GetCurrentTimestamp();
+
 		/*
 		 * Mark persistent slot inactive.  We're not freeing it, just
 		 * disconnecting, but wake up others that may be waiting for it.
+		 *
+		 * We don't track the last inactive time for non-persistent slots.
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
+		slot->last_inactive_time = now;
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
@@ -2342,6 +2365,16 @@ RestoreSlotFromDisk(const char *name)
 		slot->in_use = true;
 		slot->active_pid = 0;
 
+		/*
+		 * We set last inactive time after loading the slot from the disk into
+		 * memory. Whoever acquires the slot i.e. makes the slot active will
+		 * anyway reset it.
+		 *
+		 * Note that we don't need the slot's persistency check here because
+		 * non-persistent slots don't get saved to disk at all.
+		 */
+		slot->last_inactive_time = GetCurrentTimestamp();
+
 		restored = true;
 		break;
 	}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 4232c1e52e..d115fa88ce 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 18
+#define PG_GET_REPLICATION_SLOTS_COLS 19
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -436,6 +436,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.synced);
 
+		if (slot_contents.last_inactive_time > 0)
+			values[i++] = TimestampTzGetDatum(slot_contents.last_inactive_time);
+		else
+			nulls[i++] = true;
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 71c74350a0..96adf6c5b0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11133,9 +11133,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,text,bool,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,invalidation_reason,failover,synced}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,text,bool,bool,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,invalidation_reason,failover,synced,last_inactive_time}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7f25a083ee..2f18433ecc 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -201,6 +201,9 @@ typedef struct ReplicationSlot
 	 * forcibly flushed or not.
 	 */
 	XLogRecPtr	last_saved_confirmed_flush;
+
+	/* The time at which this slot become inactive */
+	TimestampTz last_inactive_time;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..c8259f99d5 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/043_replslot_misc.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/043_replslot_misc.pl b/src/test/recovery/t/043_replslot_misc.pl
new file mode 100644
index 0000000000..bdaaa8bce8
--- /dev/null
+++ b/src/test/recovery/t/043_replslot_misc.pl
@@ -0,0 +1,119 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Replication slot related miscellaneous tests
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# =============================================================================
+# Testcase start: Check last_inactive_time property of streaming standby's slot
+#
+
+# Initialize primary node
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby = PostgreSQL::Test::Cluster->new('standby');
+$standby->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+my $sb_slot = 'sb_slot';
+$standby->append_conf('postgresql.conf', "primary_slot_name = '$sb_slot'");
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := '$sb_slot');
+]);
+
+# Get last_inactive_time value after slot's creation. Note that the slot is still
+# inactive unless it's used by the standby below.
+my $last_inactive_time_1 = $primary->safe_psql('postgres',
+	qq(SELECT last_inactive_time FROM pg_replication_slots WHERE slot_name = '$sb_slot' AND last_inactive_time IS NOT NULL;)
+);
+
+$standby->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby);
+
+# Now the slot is active so last_inactive_time value must be NULL
+is( $primary->safe_psql(
+		'postgres',
+		qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$sb_slot';]
+	),
+	't',
+	'last inactive time for an active physical slot is NULL');
+
+# Stop the standby to check its last_inactive_time value is updated
+$standby->stop;
+
+is( $primary->safe_psql(
+		'postgres',
+		qq[SELECT last_inactive_time > '$last_inactive_time_1'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb_slot' AND last_inactive_time IS NOT NULL;]
+	),
+	't',
+	'last inactive time for an inactive physical slot is updated correctly');
+
+# Testcase end: Check last_inactive_time property of streaming standby's slot
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Check last_inactive_time property of logical subscriber's slot
+my $publisher = $primary;
+
+# Create subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('sub');
+$subscriber->init;
+
+# Setup logical replication
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+
+my $lsub_slot = 'lsub_slot';
+$publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot(slot_name := '$lsub_slot', plugin := 'pgoutput');"
+);
+
+# Get last_inactive_time value after slot's creation. Note that the slot is still
+# inactive unless it's used by the subscriber below.
+$last_inactive_time_1 = $primary->safe_psql('postgres',
+	qq(SELECT last_inactive_time FROM pg_replication_slots WHERE slot_name = '$lsub_slot' AND last_inactive_time IS NOT NULL;)
+);
+
+$subscriber->start;
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = '$lsub_slot', create_slot = false)"
+);
+
+# Wait until subscriber has caught up
+$subscriber->wait_for_subscription_sync($publisher, 'sub');
+
+# Now the slot is active so last_inactive_time value must be NULL
+is( $publisher->safe_psql(
+		'postgres',
+		qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$lsub_slot';]
+	),
+	't',
+	'last inactive time for an active logical slot is NULL');
+
+# Stop the subscriber to check its last_inactive_time value is updated
+$subscriber->stop;
+
+is( $publisher->safe_psql(
+		'postgres',
+		qq[SELECT last_inactive_time > '$last_inactive_time_1'::timestamptz FROM pg_replication_slots WHERE slot_name = '$lsub_slot' AND last_inactive_time IS NOT NULL;]
+	),
+	't',
+	'last inactive time for an inactive logical slot is updated correctly');
+
+# Testcase end: Check last_inactive_time property of logical subscriber's slot
+# =============================================================================
+
+done_testing();
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 18829ea586..c2110e984d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1476,8 +1476,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.conflicting,
     l.invalidation_reason,
     l.failover,
-    l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, invalidation_reason, failover, synced)
+    l.synced,
+    l.last_inactive_time
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, invalidation_reason, failover, synced, last_inactive_time)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

