On Wed, Mar 27, 2024 at 10:08 AM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> Please find the attached v25-0001 (made this 0001 patch now as
> inactive_since patch is committed) patch with the above changes.

Fixed an issue in synchronize_slots where DatumGetLSN is being used in
place of DatumGetTimestampTz. Found this via CF bot member [1], not on
my dev system.

Please find the attached v6 patch.


[1]
[05:14:39.281] #7  DatumGetLSN (X=<optimized out>) at
../src/include/utils/pg_lsn.h:24
[05:14:39.281] No locals.
[05:14:39.281] #8  synchronize_slots (wrconn=wrconn@entry=0x583cd170)
at ../src/backend/replication/logical/slotsync.c:757
[05:14:39.281]         isnull = false
[05:14:39.281]         remote_slot = 0x583ce1a8
[05:14:39.281]         d = <optimized out>
[05:14:39.281]         col = 10
[05:14:39.281]         slotRow = {25, 25, 3220, 3220, 28, 16, 16, 25, 25, 1184}
[05:14:39.281]         res = 0x583cd1b8
[05:14:39.281]         tupslot = 0x583ce11c
[05:14:39.281]         remote_slot_list = 0x0
[05:14:39.281]         some_slot_updated = false
[05:14:39.281]         started_tx = false
[05:14:39.281]         query = 0x57692bc4 "SELECT slot_name, plugin,
confirmed_flush_lsn, restart_lsn, catalog_xmin, two_phase, failover,
database, invalidation_reason, inactive_since FROM
pg_catalog.pg_replication_slots WHERE failover and NOT"...
[05:14:39.281]         __func__ = "synchronize_slots"
[05:14:39.281] #9  0x56ff9d1e in SyncReplicationSlots
(wrconn=0x583cd170) at
../src/backend/replication/logical/slotsync.c:1504

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 8c5f7d0064e0e01a2d458872ecc1d8e682ddc033 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 27 Mar 2024 05:29:18 +0000
Subject: [PATCH v26] Maintain inactive_since for synced slots correctly.

The slot's inactive_since isn't currently maintained for
synced slots on the standby. The commit a11f330b55 prevents
updating inactive_since with RecoveryInProgress() check in
RestoreSlotFromDisk(). But, the issue is that
RecoveryInProgress() always returns true in
RestoreSlotFromDisk() as 'xlogctl->SharedRecoveryState' is
always 'RECOVERY_STATE_CRASH' at that time. The impact of this
on a promoted standby inactive_since is always NULL for all
synced slots even after server restart.

Above issue led us to a question as to why we can't just update
inactive_since for synced slots on the standby with the value
received from remote slot on the primary. This is consistent with
any other slot parameter i.e. all of them are synced from the
primary.

This commit does two things:
1) Updates inactive_since for sync slots with the value
received from the primary's slot.

2) Ensures the value is set to current timestamp during the
shutdown of slot sync machinery to help correctly interpret the
time if the standby gets promoted without a restart.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACWLctoiH-pSjWnEpR54q4DED6rw_BRJm5pCx86_Y01MoQ%40mail.gmail.com
---
 doc/src/sgml/system-views.sgml                |  4 ++
 src/backend/replication/logical/slotsync.c    | 61 ++++++++++++++++-
 src/backend/replication/slot.c                | 41 ++++++++----
 .../t/040_standby_failover_slots_sync.pl      | 66 +++++++++++++++++++
 4 files changed, 157 insertions(+), 15 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 3c8dca8ca3..7713f168e7 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2530,6 +2530,10 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       <para>
         The time since the slot has become inactive.
         <literal>NULL</literal> if the slot is currently being used.
+        Note that the slots that are being synced from a primary server
+        (whose <structfield>synced</structfield> field is true), will get the
+        <structfield>inactive_since</structfield> value from the corresponding
+        remote slot on the primary.
       </para></entry>
      </row>
 
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..9c95a4b062 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -137,9 +137,12 @@ typedef struct RemoteSlot
 
 	/* RS_INVAL_NONE if valid, or the reason of invalidation */
 	ReplicationSlotInvalidationCause invalidated;
+
+	TimestampTz inactive_since; /* in seconds */
 } RemoteSlot;
 
 static void slotsync_failure_callback(int code, Datum arg);
+static void update_synced_slots_inactive_since(void);
 
 /*
  * If necessary, update the local synced slot's metadata based on the data
@@ -167,6 +170,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		remote_slot->two_phase == slot->data.two_phase &&
 		remote_slot->failover == slot->data.failover &&
 		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
+		remote_slot->inactive_since == slot->inactive_since &&
 		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
 		return false;
 
@@ -182,6 +186,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 	slot->data.catalog_xmin = remote_slot->catalog_xmin;
 	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+	slot->inactive_since = remote_slot->inactive_since;
 	SpinLockRelease(&slot->mutex);
 
 	if (xmin_changed)
@@ -652,9 +657,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 static bool
 synchronize_slots(WalReceiverConn *wrconn)
 {
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
-	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID, TIMESTAMPTZOID};
 
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
@@ -663,7 +668,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, failover,"
-		" database, invalidation_reason"
+		" database, invalidation_reason, inactive_since"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
@@ -743,6 +748,13 @@ synchronize_slots(WalReceiverConn *wrconn)
 		remote_slot->invalidated = isnull ? RS_INVAL_NONE :
 			GetSlotInvalidationCause(TextDatumGetCString(d));
 
+		/*
+		 * It is possible to get null value for inactive_since if the slot is
+		 * active on the primary server, so handle accordingly.
+		 */
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->inactive_since = isnull ? 0 : DatumGetTimestampTz(d);
+
 		/* Sanity check */
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
@@ -1296,6 +1308,46 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	Assert(false);
 }
 
+/*
+ * Update the inactive_since property for synced slots.
+ */
+static void
+update_synced_slots_inactive_since(void)
+{
+	TimestampTz now = 0;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* Check if it is a synchronized slot */
+		if (s->in_use && s->data.synced)
+		{
+			Assert(SlotIsLogical(s));
+
+			/*
+			 * We get the current time beforehand and only once to avoid
+			 * system calls overhead while holding the lock.
+			 */
+			if (now == 0)
+				now = GetCurrentTimestamp();
+
+			/*
+			 * Set the time since the slot has become inactive after shutting
+			 * down slot sync machinery. This helps correctly interpret the
+			 * time if the standby gets promoted without a restart.
+			 */
+			SpinLockAcquire(&s->mutex);
+			s->inactive_since = now;
+			SpinLockRelease(&s->mutex);
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Shut down the slot sync worker.
  */
@@ -1309,6 +1361,7 @@ ShutDownSlotSync(void)
 	if (SlotSyncCtx->pid == InvalidPid)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
+		update_synced_slots_inactive_since();
 		return;
 	}
 	SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1341,6 +1394,8 @@ ShutDownSlotSync(void)
 	}
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	update_synced_slots_inactive_since();
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d778c0b921..5d6882e4db 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -42,6 +42,7 @@
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -655,6 +656,7 @@ ReplicationSlotRelease(void)
 	char	   *slotname = NULL;	/* keep compiler quiet */
 	bool		is_logical = false; /* keep compiler quiet */
 	TimestampTz now = 0;
+	bool		update_inactive_since;
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
@@ -690,13 +692,19 @@ ReplicationSlotRelease(void)
 	}
 
 	/*
-	 * Set the last inactive time after marking the slot inactive. We don't
-	 * set it for the slots currently being synced from the primary to the
-	 * standby because such slots are typically inactive as decoding is not
-	 * allowed on those.
+	 * Set the time since the slot has become inactive.
+	 *
+	 * Note that we don't set it for the slots currently being synced from the
+	 * primary to the standby, because such slots typically sync the data from
+	 * the remote slot.
 	 */
 	if (!(RecoveryInProgress() && slot->data.synced))
+	{
 		now = GetCurrentTimestamp();
+		update_inactive_since = true;
+	}
+	else
+		update_inactive_since = false;
 
 	if (slot->data.persistency == RS_PERSISTENT)
 	{
@@ -706,11 +714,12 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
-		slot->inactive_since = now;
+		if (update_inactive_since)
+			slot->inactive_since = now;
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
-	else
+	else if (update_inactive_since)
 	{
 		SpinLockAcquire(&slot->mutex);
 		slot->inactive_since = now;
@@ -2369,13 +2378,21 @@ RestoreSlotFromDisk(const char *name)
 		slot->active_pid = 0;
 
 		/*
-		 * We set the last inactive time after loading the slot from the disk
-		 * into memory. Whoever acquires the slot i.e. makes the slot active
-		 * will reset it. We don't set it for the slots currently being synced
-		 * from the primary to the standby because such slots are typically
-		 * inactive as decoding is not allowed on those.
+		 * Set the time since the slot has become inactive after loading the
+		 * slot from the disk into memory. Whoever acquires the slot i.e.
+		 * makes the slot active will reset it.
+		 *
+		 * Note that we don't set it for the slots currently being synced from
+		 * the primary to the standby, because such slots typically sync the
+		 * data from the remote slot. We use InRecovery flag instead of
+		 * RecoveryInProgress() as the latter always returns true at this time
+		 * even on primary.
+		 *
+		 * Note that for synced slots after the standby starts up (i.e. after
+		 * the slots are loaded from the disk), the inactive_since will remain
+		 * zero until the next slot sync cycle.
 		 */
-		if (!(RecoveryInProgress() && slot->data.synced))
+		if (!(InRecovery && slot->data.synced))
 			slot->inactive_since = GetCurrentTimestamp();
 		else
 			slot->inactive_since = 0;
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..58d5177bad 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -35,6 +35,13 @@ my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
 $subscriber1->init;
 $subscriber1->start;
 
+# Capture the time before the logical failover slot is created on the
+# primary. We later call this publisher as primary anyway.
+my $slot_creation_time_on_primary = $publisher->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 # Create a slot on the publisher with failover disabled
 $publisher->safe_psql('postgres',
 	"SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
@@ -174,6 +181,10 @@ $primary->poll_query_until(
 	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
 	1);
 
+# Capture the inactive_since of the slot from the primary
+my $inactive_since_on_primary =
+	capture_and_validate_slot_inactive_since($primary, 'lsub1_slot', $slot_creation_time_on_primary);
+
 # Wait for the standby to catch up so that the standby is not lagging behind
 # the subscriber.
 $primary->wait_for_replay_catchup($standby1);
@@ -190,6 +201,18 @@ is( $standby1->safe_psql(
 	"t",
 	'logical slots have synced as true on standby');
 
+# Capture the inactive_since of the synced slot on the standby
+my $inactive_since_on_standby =
+	capture_and_validate_slot_inactive_since($standby1, 'lsub1_slot', $slot_creation_time_on_primary);
+
+# Synced slots on the standby must get the inactive_since from the primary.
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT '$inactive_since_on_primary'::timestamptz = '$inactive_since_on_standby'::timestamptz;"
+	),
+	"t",
+	'synchronized slot has got the inactive_since from the primary');
+
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
@@ -750,8 +773,28 @@ $primary->reload;
 $standby1->start;
 $primary->wait_for_replay_catchup($standby1);
 
+# Capture the time before the standby is promoted
+my $promotion_time_on_primary = $standby1->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 $standby1->promote;
 
+# Capture the inactive_since of the synced slot after the promotion.
+# Expectation here is that the slot gets its own inactive_since as part of the
+# promotion. We do this check before the slot is enabled on the new primary
+# below, otherwise the slot gets active setting inactive_since to NULL.
+my $inactive_since_on_new_primary =
+	capture_and_validate_slot_inactive_since($standby1, 'lsub1_slot', $promotion_time_on_primary);
+
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
+	),
+	"t",
+	'synchronized slot has got its own inactive_since on the new primary');
+
 # Update subscription with the new primary's connection info
 my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
 $subscriber1->safe_psql('postgres',
@@ -773,4 +816,27 @@ is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
 	"20",
 	'data replicated from the new primary');
 
+# Capture and validate inactive_since of a given slot.
+sub capture_and_validate_slot_inactive_since
+{
+	my ($node, $slot_name, $reference_time) = @_;
+	my $name = $node->name;
+
+	my $inactive_since = $node->safe_psql('postgres',
+		qq(SELECT inactive_since FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
+		);
+
+	# Check that the captured time is sane
+	is( $node->safe_psql(
+			'postgres',
+			qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
+				'$inactive_since'::timestamptz >= '$reference_time'::timestamptz;]
+		),
+		't',
+		"last inactive time for slot $slot_name is sane on node $name");
+
+	return $inactive_since;
+}
+
 done_testing();
-- 
2.34.1

Reply via email to