On Wed, Apr 21, 2021 at 7:11 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Wed, Apr 21, 2021 at 3:39 PM Masahiko Sawada <sawada.m...@gmail.com> wrote:
> >
> > >
> > > The test is not waiting for a new slot creation message to reach the
> > > stats collector. So, if the old slot data still exists in the file and
> > > now when we read stats via backend, then won't there exists a chance
> > > that old slot stats data still exists?
> >
> > You're right. We should wait for the message to reach the collector.
> > Or should we remove that test case?
> >
>
> I feel we can remove it. I am not sure how much value this additional
> test case is adding.

Okay, removed.

I’ve attached the updated patch. Please review it.


Regards,

--
Masahiko Sawada
EDB:  https://www.enterprisedb.com/
From d3d6e7e2c059423c2277d0cc152077a063938391 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 19 Apr 2021 16:40:27 +0900
Subject: [PATCH v12] Use HTAB for replication slot statistics.

Previously, we used to use the array to store stats for replication
slots. But this had two problems in the cases where a message for
dropping a slot gets lost, which leaves the stats for already-dropped
slot in the array: 1) the stats for the new slot are not recorded
if the array is full and 2) writing beyond the end of the array when
after restarting the number of slots whose stats are stored in the
stats file exceeds max_replication_slots.

This commit changes it to HTAB for replication slot statistics,
resolving both problems. Instead, we have pgstat_vacuum_stat() search
for all the dead replication slots in stats hashtable and tell the
collector to remove them. To not show the stats for the already-dropped
slots, pg_replication_slots view searches slot stats by the slot name
taken from pg_replication_slots.

Also, we send a message for creating a slot at slot creation,
initializing the stats. This reduces the possibility that the stats
are accumulated into the old slot stats when a message for dropping a
slot gets lost.
---
 contrib/test_decoding/t/001_repl_stats.pl |  54 +++-
 src/backend/catalog/system_views.sql      |  30 +-
 src/backend/postmaster/pgstat.c           | 319 ++++++++++++----------
 src/backend/replication/logical/logical.c |   2 +-
 src/backend/replication/slot.c            |  26 +-
 src/backend/utils/adt/pgstatfuncs.c       | 145 ++++++----
 src/include/catalog/pg_proc.dat           |  14 +-
 src/include/pgstat.h                      |  10 +-
 src/include/replication/slot.h            |   2 +-
 src/test/regress/expected/rules.out       |   4 +-
 src/tools/pgindent/typedefs.list          |   2 +-
 11 files changed, 359 insertions(+), 249 deletions(-)

diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 11b6cd9b9c..ad19ad83c6 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -2,9 +2,10 @@
 # drop replication slot and restart.
 use strict;
 use warnings;
+use File::Path qw(rmtree);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 1;
+use Test::More tests => 2;
 
 # Test set-up
 my $node = get_new_node('test');
@@ -12,6 +13,20 @@ $node->init(allows_streaming => 'logical');
 $node->append_conf('postgresql.conf', 'synchronous_commit = on');
 $node->start;
 
+# Check that replicatoin slot stats are expected.
+sub test_slot_stats
+{
+	my ($node, $expected, $msg) = @_;
+
+	my $result = $node->safe_psql(
+		'postgres', qq[
+		SELECT slot_name, total_txns > 0 AS total_txn,
+			   total_bytes > 0 AS total_bytes
+			   FROM pg_stat_replication_slots
+			   ORDER BY slot_name]);
+	is($result, $expected, $msg);
+}
+
 # Create table.
 $node->safe_psql('postgres',
         "CREATE TABLE test_repl_stat(col1 int)");
@@ -57,20 +72,41 @@ $node->start;
 
 # Verify statistics data present in pg_stat_replication_slots are sane after
 # restart.
-my $result = $node->safe_psql('postgres',
-	"SELECT slot_name, total_txns > 0 AS total_txn,
-	total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
-	ORDER BY slot_name"
-);
-is($result, qq(regression_slot1|t|t
+test_slot_stats(
+	$node,
+	qq(regression_slot1|t|t
 regression_slot2|t|t
-regression_slot3|t|t), 'check replication statistics are updated');
+regression_slot3|t|t),
+	'check replication statistics are updated');
+
+# Test to remove one of the replication slots and adjust
+# max_replication_slots accordingly to the number of slots. This leads
+# to a mismatch between the number of slots present in the stats file and the
+# number of stats present in the shared memory, simulating the scenario for
+# drop slot message lost by the statistics collector process. We verify
+# replication statistics data is fine after restart.
+
+$node->stop;
+my $datadir = $node->data_dir;
+my $slot3_replslotdir = "$datadir/pg_replslot/regression_slot3";
+
+rmtree($slot3_replslotdir);
+
+$node->append_conf('postgresql.conf', 'max_replication_slots = 2');
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+test_slot_stats(
+	$node,
+	qq(regression_slot1|t|t
+regression_slot2|t|t),
+	'check replication statistics after removing the slot file');
 
 # cleanup
 $node->safe_psql('postgres', "DROP TABLE test_repl_stat");
 $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
 $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
-$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
 
 # shutdown
 $node->stop;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index e96dd73280..fd95465b04 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -866,20 +866,6 @@ CREATE VIEW pg_stat_replication AS
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
-CREATE VIEW pg_stat_replication_slots AS
-    SELECT
-            s.slot_name,
-            s.spill_txns,
-            s.spill_count,
-            s.spill_bytes,
-            s.stream_txns,
-            s.stream_count,
-            s.stream_bytes,
-            s.total_txns,
-            s.total_bytes,
-            s.stats_reset
-    FROM pg_stat_get_replication_slots() AS s;
-
 CREATE VIEW pg_stat_slru AS
     SELECT
             s.name,
@@ -984,6 +970,22 @@ CREATE VIEW pg_replication_slots AS
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
+CREATE VIEW pg_stat_replication_slots AS
+    SELECT
+            s.slot_name,
+            s.spill_txns,
+            s.spill_count,
+            s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes,
+            s.total_txns,
+            s.total_bytes,
+            s.stats_reset
+    FROM pg_replication_slots as r,
+        LATERAL pg_stat_get_replication_slot(slot_name) as s
+    WHERE r.datoid IS NOT NULL; -- excluding physical slots
+
 CREATE VIEW pg_stat_database AS
     SELECT
             D.oid AS datid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e1ec7d8b7d..a5c0fc59dc 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -106,6 +106,7 @@
 #define PGSTAT_DB_HASH_SIZE		16
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
+#define PGSTAT_REPLSLOT_HASH_SIZE	32
 
 
 /* ----------
@@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
-static PgStat_ReplSlotStats *replSlotStats;
-static int	nReplSlotStats;
+static HTAB *replSlotStatHash = NULL;
 static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
 
 /*
@@ -319,8 +319,8 @@ static void backend_read_statsfile(void);
 static bool pgstat_write_statsfile_needed(void);
 static bool pgstat_db_requested(Oid databaseid);
 
-static int	pgstat_replslot_index(const char *name, bool create_it);
-static void pgstat_reset_replslot(int i, TimestampTz ts);
+static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
+static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
 
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
@@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void)
 	/* Clean up */
 	hash_destroy(htab);
 
+	/*
+	 * Search for all the dead replication slots in stats hashtable and tell
+	 * the stats collector to drop them.
+	 */
+	if (replSlotStatHash)
+	{
+		PgStat_StatReplSlotEntry *slotentry;
+
+		hash_seq_init(&hstat, replSlotStatHash);
+		while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			CHECK_FOR_INTERRUPTS();
+
+			if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
+				pgstat_report_replslot_drop(NameStr(slotentry->slotname));
+		}
+	}
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name)
 
 	if (name)
 	{
-		ReplicationSlot *slot;
-
-		/*
-		 * Check if the slot exists with the given name. It is possible that by
-		 * the time this message is executed the slot is dropped but at least
-		 * this check will ensure that the given name is for a valid slot.
-		 */
-		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-		slot = SearchNamedReplicationSlot(name);
-		LWLockRelease(ReplicationSlotControlLock);
-
-		if (!slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("replication slot \"%s\" does not exist",
-							name)));
-
-		/*
-		 * Nothing to do for physical slots as we collect stats only for
-		 * logical slots.
-		 */
-		if (SlotIsPhysical(slot))
-			return;
-
 		namestrcpy(&msg.m_slotname, name);
 		msg.clearall = false;
 	}
@@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize)
  * ----------
  */
 void
-pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
+pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -1822,6 +1816,7 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
 	 */
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
 	namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
+	msg.m_create = false;
 	msg.m_drop = false;
 	msg.m_spill_txns = repSlotStat->spill_txns;
 	msg.m_spill_count = repSlotStat->spill_count;
@@ -1834,6 +1829,24 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
+/* ----------
+ * pgstat_report_replslot_create() -
+ *
+ *	Tell the collector about creating the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_create(const char *slotname)
+{
+	PgStat_MsgReplSlot msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+	namestrcpy(&msg.m_slotname, slotname);
+	msg.m_create = true;
+	msg.m_drop = false;
+	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
 /* ----------
  * pgstat_report_replslot_drop() -
  *
@@ -1847,6 +1860,7 @@ pgstat_report_replslot_drop(const char *slotname)
 
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
 	namestrcpy(&msg.m_slotname, slotname);
+	msg.m_create = false;
 	msg.m_drop = true;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
@@ -2872,17 +2886,15 @@ pgstat_fetch_slru(void)
  * pgstat_fetch_replslot() -
  *
  *	Support function for the SQL-callable pgstat* functions. Returns
- *	a pointer to the replication slot statistics struct and sets the
- *	number of entries in nslots_p.
+ *	a pointer to the replication slot statistics struct.
  * ---------
  */
-PgStat_ReplSlotStats *
-pgstat_fetch_replslot(int *nslots_p)
+PgStat_StatReplSlotEntry *
+pgstat_fetch_replslot(NameData slotname)
 {
 	backend_read_statsfile();
 
-	*nslots_p = nReplSlotStats;
-	return replSlotStats;
+	return pgstat_get_replslot_entry(slotname, false);
 }
 
 /*
@@ -3654,7 +3666,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
 	int			rc;
-	int			i;
 
 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
 
@@ -3744,11 +3755,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	/*
 	 * Write replication slot stats struct
 	 */
-	for (i = 0; i < nReplSlotStats; i++)
+	if (replSlotStatHash)
 	{
-		fputc('R', fpout);
-		rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
-		(void) rc;				/* we'll check for error with ferror */
+		PgStat_StatReplSlotEntry *slotent;
+
+		hash_seq_init(&hstat, replSlotStatHash);
+		while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('R', fpout);
+			rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
+			(void) rc;				/* we'll check for error with ferror */
+		}
 	}
 
 	/*
@@ -3975,12 +3992,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
 						 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
-	/* Allocate the space for replication slot statistics */
-	replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
-										   max_replication_slots
-										   * sizeof(PgStat_ReplSlotStats));
-	nReplSlotStats = 0;
-
 	/*
 	 * Clear out global, archiver, WAL and SLRU statistics so they start from
 	 * zero in case we can't load an existing statsfile.
@@ -4005,12 +4016,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
 		slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
 
-	/*
-	 * Set the same reset timestamp for all replication slots too.
-	 */
-	for (i = 0; i < max_replication_slots; i++)
-		replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
-
 	/*
 	 * Try to open the stats file. If it doesn't exist, the backends simply
 	 * return zero for anything and the collector simply starts from scratch
@@ -4197,21 +4202,27 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 				break;
 
 				/*
-				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
+				 * 'R'	A PgStat_StatReplSlotEntry struct describing a replication
 				 * slot follows.
 				 */
 			case 'R':
-				if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
-					!= sizeof(PgStat_ReplSlotStats))
 				{
-					ereport(pgStatRunningInCollector ? LOG : WARNING,
-							(errmsg("corrupted statistics file \"%s\"",
-									statfile)));
-					memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-					goto done;
+					PgStat_StatReplSlotEntry slotbuf;
+					PgStat_StatReplSlotEntry *slotent;
+
+					if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
+						!= sizeof(PgStat_StatReplSlotEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					slotent = pgstat_get_replslot_entry(slotbuf.slotname, true);
+					memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
+					break;
 				}
-				nReplSlotStats++;
-				break;
 
 			case 'E':
 				goto done;
@@ -4424,7 +4435,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_ArchiverStats myArchiverStats;
 	PgStat_WalStats myWalStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
-	PgStat_ReplSlotStats myReplSlotStats;
+	PgStat_StatReplSlotEntry myReplSlotStats;
 	PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
 	FILE	   *fpin;
 	int32		format_id;
@@ -4553,12 +4564,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				break;
 
 				/*
-				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
+				 * 'R'	A PgStat_StatReplSlotEntry struct describing a replication
 				 * slot follows.
 				 */
 			case 'R':
-				if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
-					!= sizeof(PgStat_ReplSlotStats))
+				if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
+					!= sizeof(PgStat_StatReplSlotEntry))
 				{
 					ereport(pgStatRunningInCollector ? LOG : WARNING,
 							(errmsg("corrupted statistics file \"%s\"",
@@ -4764,8 +4775,7 @@ pgstat_clear_snapshot(void)
 	/* Reset variables */
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
-	replSlotStats = NULL;
-	nReplSlotStats = 0;
+	replSlotStatHash = NULL;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5189,20 +5199,26 @@ static void
 pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 								 int len)
 {
-	int			i;
-	int			idx = -1;
+	PgStat_StatReplSlotEntry *slotent;
 	TimestampTz ts;
 
+	/* Return if we don't have replication slot statistics */
+	if (replSlotStatHash == NULL)
+		return;
+
 	ts = GetCurrentTimestamp();
 	if (msg->clearall)
 	{
-		for (i = 0; i < nReplSlotStats; i++)
-			pgstat_reset_replslot(i, ts);
+		HASH_SEQ_STATUS sstat;
+
+		hash_seq_init(&sstat, replSlotStatHash);
+		while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
+			pgstat_reset_replslot(slotent, ts);
 	}
 	else
 	{
-		/* Get the index of replication slot statistics to reset */
-		idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
+		/* Get the slot statistics to reset */
+		slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
 
 		/*
 		 * Nothing to do if the given slot entry is not found.  This could
@@ -5210,11 +5226,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 		 * corresponding statistics entry is also removed before receiving the
 		 * reset message.
 		 */
-		if (idx < 0)
+		if (!slotent)
 			return;
 
 		/* Reset the stats for the requested replication slot */
-		pgstat_reset_replslot(idx, ts);
+		pgstat_reset_replslot(slotent, ts);
 	}
 }
 
@@ -5532,46 +5548,45 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
 static void
 pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 {
-	int			idx;
-
-	/*
-	 * Get the index of replication slot statistics.  On dropping, we don't
-	 * create the new statistics.
-	 */
-	idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
-
-	/*
-	 * The slot entry is not found or there is no space to accommodate the new
-	 * entry.  This could happen when the message for the creation of a slot
-	 * reached before the drop message even though the actual operations
-	 * happen in reverse order.  In such a case, the next update of the
-	 * statistics for the same slot will create the required entry.
-	 */
-	if (idx < 0)
-		return;
-
-	/* it must be a valid replication slot index */
-	Assert(idx < nReplSlotStats);
-
 	if (msg->m_drop)
 	{
+		Assert(!msg->m_create);
+
 		/* Remove the replication slot statistics with the given name */
-		if (idx < nReplSlotStats - 1)
-			memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
-				   sizeof(PgStat_ReplSlotStats));
-		nReplSlotStats--;
+		if (replSlotStatHash != NULL)
+			(void) hash_search(replSlotStatHash,
+							   (void *) &(msg->m_slotname),
+							   HASH_REMOVE,
+							   NULL);
 	}
 	else
 	{
-		/* Update the replication slot statistics */
-		replSlotStats[idx].spill_txns += msg->m_spill_txns;
-		replSlotStats[idx].spill_count += msg->m_spill_count;
-		replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
-		replSlotStats[idx].stream_txns += msg->m_stream_txns;
-		replSlotStats[idx].stream_count += msg->m_stream_count;
-		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
-		replSlotStats[idx].total_txns += msg->m_total_txns;
-		replSlotStats[idx].total_bytes += msg->m_total_bytes;
+		PgStat_StatReplSlotEntry *slotent;
+
+		slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
+		Assert(slotent);
+
+		if (msg->m_create)
+		{
+			/*
+			 * If the message for dropping the slot with the same name gets
+			 * lost, slotent has stats for the old slot. So we initialize
+			 * all counters at slot creation.
+			 */
+			pgstat_reset_replslot(slotent, 0);
+		}
+		else
+		{
+			/* Update the replication slot statistics */
+			slotent->spill_txns += msg->m_spill_txns;
+			slotent->spill_count += msg->m_spill_count;
+			slotent->spill_bytes += msg->m_spill_bytes;
+			slotent->stream_txns += msg->m_stream_txns;
+			slotent->stream_count += msg->m_stream_count;
+			slotent->stream_bytes += msg->m_stream_bytes;
+			slotent->total_txns += msg->m_total_txns;
+			slotent->total_bytes += msg->m_total_bytes;
+		}
 	}
 }
 
@@ -5749,59 +5764,81 @@ pgstat_db_requested(Oid databaseid)
 }
 
 /* ----------
- * pgstat_replslot_index
+ * pgstat_replslot_entry
  *
- * Return the index of entry of a replication slot with the given name, or
- * -1 if the slot is not found.
+ * Return the entry of replication slot stats with the given name. Return
+ * NULL if not found and the caller didn't request to create it.
  *
  * create_it tells whether to create the new slot entry if it is not found.
  * ----------
  */
-static int
-pgstat_replslot_index(const char *name, bool create_it)
+static PgStat_StatReplSlotEntry *
+pgstat_get_replslot_entry(NameData name, bool create_it)
 {
-	int			i;
+	PgStat_StatReplSlotEntry *slotent;
+	bool	found;
 
-	Assert(nReplSlotStats <= max_replication_slots);
-	for (i = 0; i < nReplSlotStats; i++)
+	if (replSlotStatHash == NULL)
 	{
-		if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
-			return i;			/* found */
+		HASHCTL		hash_ctl;
+
+		/*
+		 * Quick return NULL if the hash table is empty and the caller
+		 * didn't request to create the entry.
+		 */
+		if (!create_it)
+			return NULL;
+
+		hash_ctl.keysize = sizeof(NameData);
+		hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
+		hash_ctl.hcxt = pgStatLocalContext;
+		replSlotStatHash = hash_create("Replication slots hash",
+									PGSTAT_REPLSLOT_HASH_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 	}
 
-	/*
-	 * The slot is not found.  We don't want to register the new statistics if
-	 * the list is already full or the caller didn't request.
-	 */
-	if (i == max_replication_slots || !create_it)
-		return -1;
+	slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
+													   (void *) &name,
+													   create_it ? HASH_ENTER : HASH_FIND,
+													   &found);
 
-	/* Register new slot */
-	memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-	namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
+	if (!slotent)
+	{
+		/* not found */
+		Assert(!create_it && !found);
+		return NULL;
+	}
+
+	/* initialize the entry */
+	if (create_it && !found)
+	{
+		namestrcpy(&(slotent->slotname), NameStr(name));
+		pgstat_reset_replslot(slotent, 0);
+	}
 
-	return nReplSlotStats++;
+	return slotent;
 }
 
 /* ----------
  * pgstat_reset_replslot
  *
- * Reset the replication slot stats at index 'i'.
+ * Reset the given replication slot stats.
  * ----------
  */
 static void
-pgstat_reset_replslot(int i, TimestampTz ts)
+pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
 {
 	/* reset only counters. Don't clear slot name */
-	replSlotStats[i].spill_txns = 0;
-	replSlotStats[i].spill_count = 0;
-	replSlotStats[i].spill_bytes = 0;
-	replSlotStats[i].stream_txns = 0;
-	replSlotStats[i].stream_count = 0;
-	replSlotStats[i].stream_bytes = 0;
-	replSlotStats[i].total_txns = 0;
-	replSlotStats[i].total_bytes = 0;
-	replSlotStats[i].stat_reset_timestamp = ts;
+	slotent->spill_txns = 0;
+	slotent->spill_count = 0;
+	slotent->spill_bytes = 0;
+	slotent->stream_txns = 0;
+	slotent->stream_count = 0;
+	slotent->stream_bytes = 0;
+	slotent->total_txns = 0;
+	slotent->total_bytes = 0;
+	slotent->stat_reset_timestamp = ts;
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 35b0c67641..00543ede45 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1773,7 +1773,7 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
-	PgStat_ReplSlotStats repSlotStat;
+	PgStat_StatReplSlotEntry repSlotStat;
 
 	/* Nothing to do if we don't have any replication stats to be sent. */
 	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f61b163f78..c8d8df2889 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -328,12 +328,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * ReplicationSlotAllocationLock.
 	 */
 	if (SlotIsLogical(slot))
-	{
-		PgStat_ReplSlotStats repSlotStat;
-		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
-		namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
-		pgstat_report_replslot(&repSlotStat);
-	}
+		pgstat_report_replslot_create(NameStr(slot->data.name));
 
 	/*
 	 * Now that the slot has been marked as in_use and active, it's safe to
@@ -349,17 +344,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  * Search for the named replication slot.
  *
  * Return the replication slot if found, otherwise NULL.
- *
- * The caller must hold ReplicationSlotControlLock in shared mode.
  */
 ReplicationSlot *
-SearchNamedReplicationSlot(const char *name)
+SearchNamedReplicationSlot(const char *name, bool need_lock)
 {
 	int			i;
 	ReplicationSlot	*slot = NULL;
 
-	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
-								LW_SHARED));
+	if (need_lock)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -372,6 +365,9 @@ SearchNamedReplicationSlot(const char *name)
 		}
 	}
 
+	if (need_lock)
+		LWLockRelease(ReplicationSlotControlLock);
+
 	return slot;
 }
 
@@ -416,7 +412,7 @@ retry:
 	 * Search for the slot with the specified name if the slot to acquire is
 	 * not given. If the slot is not found, we either return -1 or error out.
 	 */
-	s = slot ? slot : SearchNamedReplicationSlot(name);
+	s = slot ? slot : SearchNamedReplicationSlot(name, false);
 	if (s == NULL || !s->in_use)
 	{
 		LWLockRelease(ReplicationSlotControlLock);
@@ -713,6 +709,12 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * reduce that possibility. If the messages reached in reverse, we would
 	 * lose one statistics update message. But the next update message will
 	 * create the statistics for the replication slot.
+	 *
+	 * XXX In case, the messages for creation and drop slot of the same name
+	 * get lost and create happens before (auto)vacuum cleans up the dead slot,
+	 * the stats will be accumulated into the old slot. One can imagine having
+	 * OIDs for each slot to avoid the accumulation of stats but that doesn't
+	 * seem worth doing as in practice this won't happen frequently.
 	 */
 	if (SlotIsLogical(slot))
 		pgstat_report_replslot_drop(NameStr(slot->data.name));
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 2680190a40..f2fd919e57 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -2207,8 +2208,32 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
 	char	   *target = NULL;
 
 	if (!PG_ARGISNULL(0))
+	{
+		ReplicationSlot *slot;
+
 		target = text_to_cstring(PG_GETARG_TEXT_PP(0));
 
+		/*
+		 * Check if the slot exists with the given name. It is possible that by
+		 * the time this message is executed the slot is dropped but at least
+		 * this check will ensure that the given name is for a valid slot.
+		 */
+		slot = SearchNamedReplicationSlot(target, true);
+
+		if (!slot)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("replication slot \"%s\" does not exist",
+							target)));
+
+		/*
+		 * Nothing to do for physical slots as we collect stats only for
+		 * logical slots.
+		 */
+		if (SlotIsPhysical(slot))
+			PG_RETURN_VOID();
+	}
+
 	pgstat_reset_replslot_counter(target);
 
 	PG_RETURN_VOID();
@@ -2280,73 +2305,77 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
 
-/* Get the statistics for the replication slots */
+/*
+ * Get the statistics for the replication slot. If the slot statistics is not
+ * available, return all-zeroes stats.
+ */
 Datum
-pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
 #define PG_STAT_GET_REPLICATION_SLOT_COLS 10
-	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	text		*slotname_text = PG_GETARG_TEXT_P(0);
+	NameData	slotname;
 	TupleDesc	tupdesc;
-	Tuplestorestate *tupstore;
-	MemoryContext per_query_ctx;
-	MemoryContext oldcontext;
-	PgStat_ReplSlotStats *slotstats;
-	int			nstats;
-	int			i;
+	Datum		values[10];
+	bool		nulls[10];
+	PgStat_StatReplSlotEntry *slotent;
+	PgStat_StatReplSlotEntry allzero;
 
-	/* check to see if caller supports us returning a tuplestore */
-	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("set-valued function called in context that cannot accept a set")));
-	if (!(rsinfo->allowedModes & SFRM_Materialize))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("materialize mode required, but it is not allowed in this context")));
-
-	/* Build a tuple descriptor for our result type */
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
-	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
-	oldcontext = MemoryContextSwitchTo(per_query_ctx);
-
-	tupstore = tuplestore_begin_heap(true, false, work_mem);
-	rsinfo->returnMode = SFRM_Materialize;
-	rsinfo->setResult = tupstore;
-	rsinfo->setDesc = tupdesc;
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
 
-	MemoryContextSwitchTo(oldcontext);
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
 
-	slotstats = pgstat_fetch_replslot(&nstats);
-	for (i = 0; i < nstats; i++)
+	namestrcpy(&slotname, text_to_cstring(slotname_text));
+	slotent = pgstat_fetch_replslot(slotname);
+	if (!slotent)
 	{
-		Datum		values[PG_STAT_GET_REPLICATION_SLOT_COLS];
-		bool		nulls[PG_STAT_GET_REPLICATION_SLOT_COLS];
-		PgStat_ReplSlotStats *s = &(slotstats[i]);
-
-		MemSet(values, 0, sizeof(values));
-		MemSet(nulls, 0, sizeof(nulls));
-
-		values[0] = CStringGetTextDatum(NameStr(s->slotname));
-		values[1] = Int64GetDatum(s->spill_txns);
-		values[2] = Int64GetDatum(s->spill_count);
-		values[3] = Int64GetDatum(s->spill_bytes);
-		values[4] = Int64GetDatum(s->stream_txns);
-		values[5] = Int64GetDatum(s->stream_count);
-		values[6] = Int64GetDatum(s->stream_bytes);
-		values[7] = Int64GetDatum(s->total_txns);
-		values[8] = Int64GetDatum(s->total_bytes);
-
-		if (s->stat_reset_timestamp == 0)
-			nulls[9] = true;
-		else
-			values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
-
-		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+		/*
+		 * If the slot is not found, initialise its stats. This is possible if
+		 * the create slot message is lost.
+		 */
+		memset(&allzero, 0, sizeof(PgStat_StatReplSlotEntry));
+		slotent = &allzero;
 	}
 
-	tuplestore_donestoring(tupstore);
+	values[0] = CStringGetTextDatum(NameStr(slotname));
+	values[1] = Int64GetDatum(slotent->spill_txns);
+	values[2] = Int64GetDatum(slotent->spill_count);
+	values[3] = Int64GetDatum(slotent->spill_bytes);
+	values[4] = Int64GetDatum(slotent->stream_txns);
+	values[5] = Int64GetDatum(slotent->stream_count);
+	values[6] = Int64GetDatum(slotent->stream_bytes);
+	values[7] = Int64GetDatum(slotent->total_txns);
+	values[8] = Int64GetDatum(slotent->total_bytes);
 
-	return (Datum) 0;
+	if (slotent->stat_reset_timestamp == 0)
+		nulls[9] = true;
+	else
+		values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b62abcd22c..e4cd514992 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5308,14 +5308,14 @@
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
   prosrc => 'pg_stat_get_wal_receiver' },
-{ oid => '8595', descr => 'statistics: information about replication slots',
-  proname => 'pg_stat_get_replication_slots', prorows => '10',
+{ oid => '8595', descr => 'statistics: information about replication slot',
+  proname => 'pg_stat_get_replication_slot', prorows => '1',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
-  prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
-  prosrc => 'pg_stat_get_replication_slots' },
+  prorettype => 'record', proargtypes => 'text',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+  prosrc => 'pg_stat_get_replication_slot' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5c5920b0b5..1ce363e7d1 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -541,6 +541,7 @@ typedef struct PgStat_MsgReplSlot
 {
 	PgStat_MsgHdr m_hdr;
 	NameData	m_slotname;
+	bool		m_create;
 	bool		m_drop;
 	PgStat_Counter m_spill_txns;
 	PgStat_Counter m_spill_count;
@@ -917,7 +918,7 @@ typedef struct PgStat_SLRUStats
 /*
  * Replication slot statistics kept in the stats collector
  */
-typedef struct PgStat_ReplSlotStats
+typedef struct PgStat_StatReplSlotEntry
 {
 	NameData	slotname;
 	PgStat_Counter spill_txns;
@@ -929,7 +930,7 @@ typedef struct PgStat_ReplSlotStats
 	PgStat_Counter total_txns;
 	PgStat_Counter total_bytes;
 	TimestampTz stat_reset_timestamp;
-} PgStat_ReplSlotStats;
+} PgStat_StatReplSlotEntry;
 
 
 /*
@@ -1031,7 +1032,8 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
+extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
+extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
@@ -1129,7 +1131,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
-extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
+extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
 extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1ad5e6c50d..357068403a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
-extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6399f3feef..ce2ecb914b 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2071,7 +2071,9 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.total_txns,
     s.total_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
+   FROM pg_replication_slots r,
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+  WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c7aff677d4..878b67a276 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1870,12 +1870,12 @@ PgStat_MsgTabstat
 PgStat_MsgTempFile
 PgStat_MsgVacuum
 PgStat_MsgWal
-PgStat_ReplSlotStats
 PgStat_SLRUStats
 PgStat_Shared_Reset_Target
 PgStat_Single_Reset_Type
 PgStat_StatDBEntry
 PgStat_StatFuncEntry
+PgStat_StatReplSlotEntry
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts
-- 
2.24.3 (Apple Git-128)

Reply via email to