On Wed, Apr 14, 2021 at 12:09 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Tue, Apr 13, 2021 at 1:37 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > On Mon, Apr 12, 2021 at 7:03 PM Masahiko Sawada <sawada.m...@gmail.com> 
> > wrote:
> > >
> > >
> > > The following test for the latest v8 patch seems to show different.
> > > total_bytes is 1808 whereas spill_bytes is 13200000. Am I missing
> > > something?
> > >
> > > postgres(1:85969)=# select pg_create_logical_replication_slot('s',
> > > 'test_decoding');
> > >  pg_create_logical_replication_slot
> > > ------------------------------------
> > >  (s,0/1884468)
> > > (1 row)
> > >
> > > postgres(1:85969)=# create table a (i int);
> > > CREATE TABLE
> > > postgres(1:85969)=# insert into a select generate_series(1, 100000);
> > > INSERT 0 100000
> > > postgres(1:85969)=# set logical_decoding_work_mem to 64;
> > > SET
> > > postgres(1:85969)=# select * from pg_stat_replication_slots ;
> > >  slot_name | total_txns | total_bytes | spill_txns | spill_count |
> > > spill_bytes | stream_txns | stream_count | stream_bytes | stats_reset
> > > -----------+------------+-------------+------------+-------------+-------------+-------------+--------------+--------------+-------------
> > >  s         |          0 |           0 |          0 |           0 |
> > >       0 |           0 |            0 |            0 |
> > > (1 row)
> > >
> > > postgres(1:85969)=# select count(*) from
> > > pg_logical_slot_peek_changes('s', NULL, NULL);
> > >  count
> > > --------
> > >  100004
> > > (1 row)
> > >
> > > postgres(1:85969)=# select * from pg_stat_replication_slots ;
> > >  slot_name | total_txns | total_bytes | spill_txns | spill_count |
> > > spill_bytes | stream_txns | stream_count | stream_bytes | stats_reset
> > > -----------+------------+-------------+------------+-------------+-------------+-------------+--------------+--------------+-------------
> > >  s         |          2 |        1808 |          1 |         202 |
> > > 13200000 |           0 |            0 |            0 |
> > > (1 row)
> > >
> >
> > Thanks for identifying this issue, while spilling the transactions
> > reorder buffer changes gets released, we will not be able to get the
> > total size for spilled transactions from reorderbuffer size. I have
> > fixed it by including spilledbytes to totalbytes in case of spilled
> > transactions. Attached patch has the fix for this.
> > Thoughts?
> >
>
> I am not sure if that is the best way to fix it because sometimes we
> clear the serialized flag in which case it might not give the correct
> answer. Another way to fix it could be that before we try to restore a
> new set of changes, we update totalBytes counter. See, the attached
> patch atop your v6-0002-* patch.

I felt calculating totalbytes this way is better than depending on
spill_bytes. I have taken your changes. Attached patch includes the
changes suggested.
Thoughts?

Regards,
Vignesh
From 9a4ca0fcef85d000856339cfcb2a58eb87ee5e72 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Wed, 14 Apr 2021 10:08:13 +0530
Subject: [PATCH v10 1/4] Added total txns and total txn bytes to replication
 statistics.

This adds the statistics about total transactions count and total transaction
data logically replicated to the decoding output plugin from ReorderBuffer.
Users can query the pg_stat_replication_slots view to check these stats.
---
 contrib/test_decoding/expected/stats.out      | 79 +++++++++++++------
 contrib/test_decoding/sql/stats.sql           | 48 +++++++----
 doc/src/sgml/monitoring.sgml                  | 25 ++++++
 src/backend/catalog/system_views.sql          |  2 +
 src/backend/postmaster/pgstat.c               |  6 ++
 src/backend/replication/logical/logical.c     | 16 ++--
 .../replication/logical/reorderbuffer.c       | 17 ++++
 src/backend/utils/adt/pgstatfuncs.c           |  8 +-
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/pgstat.h                          |  4 +
 src/include/replication/reorderbuffer.h       |  4 +
 src/test/regress/expected/rules.out           |  4 +-
 12 files changed, 170 insertions(+), 49 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index bca36fa903..bc8e601eab 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 
 CREATE TABLE stats_test(data text);
 -- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
 DECLARE
   start_time timestamptz := clock_timestamp();
   updated bool;
@@ -16,12 +16,25 @@ BEGIN
   -- we don't want to wait forever; loop will exit after 30 seconds
   FOR i IN 1 .. 300 LOOP
 
-    -- check to see if all updates have been reset/updated
-    SELECT CASE WHEN check_reset THEN (spill_txns = 0)
-                ELSE (spill_txns > 0)
-           END
-    INTO updated
-    FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+    IF check_spill_txns THEN
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+                  ELSE (spill_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    ELSE
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (total_txns = 0)
+                  ELSE (total_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    END IF;
 
     exit WHEN updated;
 
@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
 -- Check stats, wait for the stats collector to update. We can't test the
 -- exact stats count as that can vary if any background transaction (say by
 -- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot | t          | t
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t          | t           | t          | t
 (1 row)
 
 -- reset the slot stats, and wait for stats collector to reset
@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot');
  
 (1 row)
 
-SELECT wait_for_decode_stats(true);
+SELECT wait_for_decode_stats(true, true);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot |          0 |           0
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot |          0 |           0 |          0 |           0
 (1 row)
 
 -- decode and check stats again.
@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
   5002
 (1 row)
 
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
+ wait_for_decode_stats 
+-----------------------
+ 
+(1 row)
+
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t          | t           | t          | t
+(1 row)
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+ pg_stat_reset_replication_slot 
+--------------------------------
+ 
+(1 row)
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | f          | f           | t          | t
 (1 row)
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots;
 (1 row)
 
 COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 51294e48e8..8c34aeced1 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 CREATE TABLE stats_test(data text);
 
 -- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
 DECLARE
   start_time timestamptz := clock_timestamp();
   updated bool;
@@ -14,12 +14,25 @@ BEGIN
   -- we don't want to wait forever; loop will exit after 30 seconds
   FOR i IN 1 .. 300 LOOP
 
-    -- check to see if all updates have been reset/updated
-    SELECT CASE WHEN check_reset THEN (spill_txns = 0)
-                ELSE (spill_txns > 0)
-           END
-    INTO updated
-    FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+    IF check_spill_txns THEN
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+                  ELSE (spill_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    ELSE
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (total_txns = 0)
+                  ELSE (total_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    END IF;
 
     exit WHEN updated;
 
@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
 -- Check stats, wait for the stats collector to update. We can't test the
 -- exact stats count as that can vary if any background transaction (say by
 -- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
 
 -- reset the slot stats, and wait for stats collector to reset
 SELECT pg_stat_reset_replication_slot('regression_slot');
-SELECT wait_for_decode_stats(true);
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(true, true);
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
 
 -- decode and check stats again.
 SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots;
 SELECT slot_name FROM pg_stat_replication_slots;
 COMMIT;
 
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 8287587f61..25024dfc7c 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2716,6 +2716,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent to the decoding output plugin for
+        this slot. This counter is used to maintain the top level transactions,
+        so the counter is not incremented for subtransactions. Note that this
+        includes the transactions streamed and or spilled.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded transactions data sent to the decoding output plugin
+        while decoding the changes from WAL for this slot. This can be used to
+        gauge the total amount of data sent during logical decoding. Note that
+        this includes the data streamed and or spilled. 
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 451db2ee0a..6d78b33590 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_txns,
             s.stream_count,
             s.stream_bytes,
+            s.total_txns,
+            s.total_bytes,
             s.stats_reset
     FROM pg_stat_get_replication_slots() AS s;
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 666ce95d08..e1ec7d8b7d 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
 	msg.m_stream_txns = repSlotStat->stream_txns;
 	msg.m_stream_count = repSlotStat->stream_count;
 	msg.m_stream_bytes = repSlotStat->stream_bytes;
+	msg.m_total_txns = repSlotStat->total_txns;
+	msg.m_total_bytes = repSlotStat->total_bytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 		replSlotStats[idx].stream_txns += msg->m_stream_txns;
 		replSlotStats[idx].stream_count += msg->m_stream_count;
 		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+		replSlotStats[idx].total_txns += msg->m_total_txns;
+		replSlotStats[idx].total_bytes += msg->m_total_bytes;
 	}
 }
 
@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
 	replSlotStats[i].stream_txns = 0;
 	replSlotStats[i].stream_count = 0;
 	replSlotStats[i].stream_bytes = 0;
+	replSlotStats[i].total_txns = 0;
+	replSlotStats[i].total_bytes = 0;
 	replSlotStats[i].stat_reset_timestamp = ts;
 }
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 68e210ce12..ed9a7c8489 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1776,20 +1776,22 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	PgStat_ReplSlotStats repSlotStat;
 
 	/*
-	 * Nothing to do if we haven't spilled or streamed anything since the last
-	 * time the stats has been sent.
+	 * Nothing to do if we don't have any replication stats to be sent.
 	 */
-	if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 &&
+		rb->totalBytes <= 0 && rb->totalTxns <=0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
 		 rb,
 		 (long long) rb->spillTxns,
 		 (long long) rb->spillCount,
 		 (long long) rb->spillBytes,
 		 (long long) rb->streamTxns,
 		 (long long) rb->streamCount,
-		 (long long) rb->streamBytes);
+		 (long long) rb->streamBytes,
+		 (long long) rb->totalTxns,
+		 (long long) rb->totalBytes);
 
 	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
 	repSlotStat.spill_txns = rb->spillTxns;
@@ -1798,6 +1800,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.stream_txns = rb->streamTxns;
 	repSlotStat.stream_count = rb->streamCount;
 	repSlotStat.stream_bytes = rb->streamBytes;
+	repSlotStat.total_txns = rb->totalTxns;
+	repSlotStat.total_bytes = rb->totalBytes;
 
 	pgstat_report_replslot(&repSlotStat);
 	rb->spillTxns = 0;
@@ -1806,4 +1810,6 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->streamTxns = 0;
 	rb->streamCount = 0;
 	rb->streamBytes = 0;
+	rb->totalTxns = 0;
+	rb->totalBytes = 0;
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 52d06285a2..0f98189819 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
 	buffer->streamTxns = 0;
 	buffer->streamCount = 0;
 	buffer->streamBytes = 0;
+	buffer->totalTxns = 0;
+	buffer->totalBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -1363,6 +1365,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		dlist_delete(&change->node);
 		dlist_push_tail(&state->old_change, &change->node);
 
+		/*
+		 * Update the total bytes processed before releasing the current set of
+		 * changes and restoring the new set of changes.
+		 */
+		rb->totalBytes += rb->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
 										&state->entries[off].segno))
 		{
@@ -2363,6 +2370,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
+		/*
+		 * Update total transaction count and total transaction bytes
+		 * processed. Ensure to not count the streamed transaction multiple
+		 * times.
+		 */
+		if (!rbtxn_is_streamed(txn))
+			rb->totalTxns++;
+
+		rb->totalBytes += rb->size;
+
 		/*
 		 * Done with current changes, send the last message for this set of
 		 * changes depending upon streaming mode.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 521ba73614..2680190a40 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 		values[4] = Int64GetDatum(s->stream_txns);
 		values[5] = Int64GetDatum(s->stream_count);
 		values[6] = Int64GetDatum(s->stream_bytes);
+		values[7] = Int64GetDatum(s->total_txns);
+		values[8] = Int64GetDatum(s->total_bytes);
 
 		if (s->stat_reset_timestamp == 0)
-			nulls[7] = true;
+			nulls[9] = true;
 		else
-			values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+			values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f4957653ae..591753fe81 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5315,9 +5315,9 @@
   proname => 'pg_stat_get_replication_slots', prorows => '10',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 8e11215058..2aeb3cded4 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_stream_txns;
 	PgStat_Counter m_stream_count;
 	PgStat_Counter m_stream_bytes;
+	PgStat_Counter m_total_txns;
+	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
 /* ----------
@@ -924,6 +926,8 @@ typedef struct PgStat_ReplSlotStats
 	PgStat_Counter stream_txns;
 	PgStat_Counter stream_count;
 	PgStat_Counter stream_bytes;
+	PgStat_Counter total_txns;
+	PgStat_Counter total_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_ReplSlotStats;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6a..a372b70b7d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,10 @@ struct ReorderBuffer
 	int64		streamTxns;		/* number of transactions streamed */
 	int64		streamCount;	/* streaming invocation counter */
 	int64		streamBytes;	/* amount of data streamed */
+
+	/* Statistics about all the replicated transactions */
+	int64		totalTxns;		/* total number of transactions replicated */
+	int64		totalBytes;		/* total amount of data replicated */
 };
 
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 186e6c966c..6399f3feef 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2068,8 +2068,10 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_txns,
     s.stream_count,
     s.stream_bytes,
+    s.total_txns,
+    s.total_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
-- 
2.25.1

From 8e78a79588a38137bc8ba15daf75a4b417b1c86b Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Mon, 5 Apr 2021 18:17:21 +0530
Subject: [PATCH v10 2/4] Added tests for verification of logical replication
 statistics.

Added tests for verification of logical replication statistics after
restart of server.
---
 contrib/test_decoding/Makefile            |  2 +
 contrib/test_decoding/t/001_repl_stats.pl | 76 +++++++++++++++++++++++
 src/backend/catalog/system_views.sql      |  5 +-
 3 files changed, 82 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/t/001_repl_stats.pl

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c5e28ce5cc..9a31e0b879 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 # typical installcheck users do not have (e.g. buildfarm clients).
 NO_INSTALLCHECK = 1
 
+TAP_TESTS = 1
+
 ifdef USE_PGXS
 PG_CONFIG = pg_config
 PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
new file mode 100644
index 0000000000..11b6cd9b9c
--- /dev/null
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -0,0 +1,76 @@
+# Test replication statistics data in pg_stat_replication_slots is sane after
+# drop replication slot and restart.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Test set-up
+my $node = get_new_node('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'synchronous_commit = on');
+$node->start;
+
+# Create table.
+$node->safe_psql('postgres',
+        "CREATE TABLE test_repl_stat(col1 int)");
+
+# Create replication slots.
+$node->safe_psql(
+	'postgres', qq[
+	SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');
+	SELECT pg_create_logical_replication_slot('regression_slot2', 'test_decoding');
+	SELECT pg_create_logical_replication_slot('regression_slot3', 'test_decoding');
+	SELECT pg_create_logical_replication_slot('regression_slot4', 'test_decoding');
+]);
+
+# Insert some data.
+$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
+
+$node->safe_psql(
+	'postgres', qq[
+	SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL,
+	NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+	SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL,
+	NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+	SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL,
+	NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+	SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL,
+	NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+]);
+
+# Wait for the statistics to be updated.
+$node->poll_query_until(
+	'postgres', qq[
+	SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
+	WHERE slot_name ~ 'regression_slot'
+	AND total_txns > 0 AND total_bytes > 0;
+]) or die "Timed out while waiting for statistics to be updated";
+
+# Test to drop one of the replication slot and verify replication statistics data is
+# fine after restart.
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
+
+$node->stop;
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+my $result = $node->safe_psql('postgres',
+	"SELECT slot_name, total_txns > 0 AS total_txn,
+	total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
+	ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t), 'check replication statistics are updated');
+
+# cleanup
+$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
+
+# shutdown
+$node->stop;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 6d78b33590..9fdf445579 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -878,7 +878,10 @@ CREATE VIEW pg_stat_replication_slots AS
             s.total_txns,
             s.total_bytes,
             s.stats_reset
-    FROM pg_stat_get_replication_slots() AS s;
+    FROM pg_replication_slots as r,
+        LATERAL pg_stat_get_replication_slot(slot_name) as s
+    WHERE r.datoid IS NOT NULL; -- excluding physical slots
+
 
 CREATE VIEW pg_stat_slru AS
     SELECT
-- 
2.25.1

From e10381b3a7a5b3dd412b7df8fb6667f3076cf28a Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Wed, 14 Apr 2021 16:56:39 +0530
Subject: [PATCH v10 3/4] Use HTAB for replication slot statistics.

Previously, we used to use the array to store stats for repilcation
slots. But this had two problems in case where drop-slot-stats message
is lost: 1) the stats for the new slot are not recorded and 2) writing
beyond the end of the array when after restring the number of slots
whose stats are stored in the stats file exceeds
max_replication_slots.

This commit changes to use HTAB for replication slot statistics,
resolving both problems. Instead, we have pgstat_vacuum_stat() checks
if a slot for stats entry in the stats collector still exists or not.
Then send drop-slot-stats message.
---
 src/backend/catalog/system_views.sql      |   1 -
 src/backend/postmaster/pgstat.c           | 268 +++++++++++-----------
 src/backend/replication/logical/logical.c |   2 +-
 src/backend/replication/slot.c            |  23 +-
 src/backend/utils/adt/pgstatfuncs.c       | 135 ++++++-----
 src/include/catalog/pg_proc.dat           |  14 +-
 src/include/pgstat.h                      |   8 +-
 src/include/replication/slot.h            |   2 +-
 src/test/regress/expected/rules.out       |   4 +-
 9 files changed, 242 insertions(+), 215 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 9fdf445579..c95c900a12 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -882,7 +882,6 @@ CREATE VIEW pg_stat_replication_slots AS
         LATERAL pg_stat_get_replication_slot(slot_name) as s
     WHERE r.datoid IS NOT NULL; -- excluding physical slots
 
-
 CREATE VIEW pg_stat_slru AS
     SELECT
             s.name,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e1ec7d8b7d..27937e07b9 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -106,6 +106,7 @@
 #define PGSTAT_DB_HASH_SIZE		16
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
+#define PGSTAT_REPLSLOT_HASH_SIZE	32
 
 
 /* ----------
@@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
-static PgStat_ReplSlotStats *replSlotStats;
-static int	nReplSlotStats;
+static HTAB *replSlotStats = NULL;
 static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
 
 /*
@@ -319,8 +319,8 @@ static void backend_read_statsfile(void);
 static bool pgstat_write_statsfile_needed(void);
 static bool pgstat_db_requested(Oid databaseid);
 
-static int	pgstat_replslot_index(const char *name, bool create_it);
-static void pgstat_reset_replslot(int i, TimestampTz ts);
+static PgStat_ReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
+static void pgstat_reset_replslot(PgStat_ReplSlotEntry *slotstats, TimestampTz ts);
 
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
@@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void)
 	/* Clean up */
 	hash_destroy(htab);
 
+	/*
+	 * Check for all replication slots in stats hash table. We do this check
+	 * when replSlotStats has more than max_replication_slots entries, i.g,
+	 * when there are stats for the already-dropped slot, to avoid frequent
+	 * call SearchNamedReplicationSlot() which acquires LWLock.
+	 */
+	if (replSlotStats && hash_get_num_entries(replSlotStats) > max_replication_slots)
+	{
+		PgStat_ReplSlotEntry *slotentry;
+
+		hash_seq_init(&hstat, replSlotStats);
+		while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
+				pgstat_report_replslot_drop(NameStr(slotentry->slotname));
+		}
+	}
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name)
 
 	if (name)
 	{
-		ReplicationSlot *slot;
-
-		/*
-		 * Check if the slot exists with the given name. It is possible that by
-		 * the time this message is executed the slot is dropped but at least
-		 * this check will ensure that the given name is for a valid slot.
-		 */
-		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-		slot = SearchNamedReplicationSlot(name);
-		LWLockRelease(ReplicationSlotControlLock);
-
-		if (!slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("replication slot \"%s\" does not exist",
-							name)));
-
-		/*
-		 * Nothing to do for physical slots as we collect stats only for
-		 * logical slots.
-		 */
-		if (SlotIsPhysical(slot))
-			return;
-
 		namestrcpy(&msg.m_slotname, name);
 		msg.clearall = false;
 	}
@@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize)
  * ----------
  */
 void
-pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
+pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -2872,17 +2866,19 @@ pgstat_fetch_slru(void)
  * pgstat_fetch_replslot() -
  *
  *	Support function for the SQL-callable pgstat* functions. Returns
- *	a pointer to the replication slot statistics struct and sets the
- *	number of entries in nslots_p.
+ *	a pointer to the replication slot statistics struct.
  * ---------
  */
-PgStat_ReplSlotStats *
-pgstat_fetch_replslot(int *nslots_p)
+PgStat_ReplSlotEntry *
+pgstat_fetch_replslot(NameData slotname)
 {
+	PgStat_ReplSlotEntry *slotent = NULL;
+
 	backend_read_statsfile();
 
-	*nslots_p = nReplSlotStats;
-	return replSlotStats;
+	slotent = pgstat_get_replslot_entry(slotname, false);
+
+	return slotent;
 }
 
 /*
@@ -3654,7 +3650,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
 	int			rc;
-	int			i;
 
 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
 
@@ -3744,11 +3739,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	/*
 	 * Write replication slot stats struct
 	 */
-	for (i = 0; i < nReplSlotStats; i++)
+	if (replSlotStats)
 	{
-		fputc('R', fpout);
-		rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
-		(void) rc;				/* we'll check for error with ferror */
+		PgStat_ReplSlotEntry *slotentry;
+
+		hash_seq_init(&hstat, replSlotStats);
+		while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('R', fpout);
+			rc = fwrite(slotentry, sizeof(PgStat_ReplSlotEntry), 1, fpout);
+			(void) rc;				/* we'll check for error with ferror */
+		}
 	}
 
 	/*
@@ -3975,12 +3976,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
 						 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
-	/* Allocate the space for replication slot statistics */
-	replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
-										   max_replication_slots
-										   * sizeof(PgStat_ReplSlotStats));
-	nReplSlotStats = 0;
-
 	/*
 	 * Clear out global, archiver, WAL and SLRU statistics so they start from
 	 * zero in case we can't load an existing statsfile.
@@ -4005,12 +4000,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
 		slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
 
-	/*
-	 * Set the same reset timestamp for all replication slots too.
-	 */
-	for (i = 0; i < max_replication_slots; i++)
-		replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
-
 	/*
 	 * Try to open the stats file. If it doesn't exist, the backends simply
 	 * return zero for anything and the collector simply starts from scratch
@@ -4197,21 +4186,27 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 				break;
 
 				/*
-				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
+				 * 'R'	A PgStat_ReplSlotEntry struct describing a replication
 				 * slot follows.
 				 */
 			case 'R':
-				if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
-					!= sizeof(PgStat_ReplSlotStats))
+			{
+				PgStat_ReplSlotEntry slotstats;
+				PgStat_ReplSlotEntry *slotent;
+
+				if (fread(&slotstats, 1, sizeof(PgStat_ReplSlotEntry), fpin)
+					!= sizeof(PgStat_ReplSlotEntry))
 				{
 					ereport(pgStatRunningInCollector ? LOG : WARNING,
 							(errmsg("corrupted statistics file \"%s\"",
 									statfile)));
-					memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
 					goto done;
 				}
-				nReplSlotStats++;
+
+				slotent = pgstat_get_replslot_entry(slotstats.slotname, true);
+				memcpy(slotent, &slotstats, sizeof(PgStat_ReplSlotEntry));
 				break;
+			}
 
 			case 'E':
 				goto done;
@@ -4424,7 +4419,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_ArchiverStats myArchiverStats;
 	PgStat_WalStats myWalStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
-	PgStat_ReplSlotStats myReplSlotStats;
+	PgStat_ReplSlotEntry myReplSlotStats;
 	PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
 	FILE	   *fpin;
 	int32		format_id;
@@ -4553,12 +4548,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				break;
 
 				/*
-				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
+				 * 'R'	A PgStat_ReplSlotEntry struct describing a replication
 				 * slot follows.
 				 */
 			case 'R':
-				if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
-					!= sizeof(PgStat_ReplSlotStats))
+				if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotEntry), fpin)
+					!= sizeof(PgStat_ReplSlotEntry))
 				{
 					ereport(pgStatRunningInCollector ? LOG : WARNING,
 							(errmsg("corrupted statistics file \"%s\"",
@@ -4765,7 +4760,6 @@ pgstat_clear_snapshot(void)
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
 	replSlotStats = NULL;
-	nReplSlotStats = 0;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5189,20 +5183,26 @@ static void
 pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 								 int len)
 {
-	int			i;
-	int			idx = -1;
+	PgStat_ReplSlotEntry *slotent;
 	TimestampTz ts;
 
+	/* Return if we don't have replication slot statistics */
+	if (replSlotStats == NULL)
+		return;
+
 	ts = GetCurrentTimestamp();
 	if (msg->clearall)
 	{
-		for (i = 0; i < nReplSlotStats; i++)
-			pgstat_reset_replslot(i, ts);
+		HASH_SEQ_STATUS sstat;
+
+		hash_seq_init(&sstat, replSlotStats);
+		while ((slotent = (PgStat_ReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
+			pgstat_reset_replslot(slotent, ts);
 	}
 	else
 	{
 		/* Get the index of replication slot statistics to reset */
-		idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
+		slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
 
 		/*
 		 * Nothing to do if the given slot entry is not found.  This could
@@ -5210,11 +5210,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 		 * corresponding statistics entry is also removed before receiving the
 		 * reset message.
 		 */
-		if (idx < 0)
+		if (!slotent)
 			return;
 
 		/* Reset the stats for the requested replication slot */
-		pgstat_reset_replslot(idx, ts);
+		pgstat_reset_replslot(slotent, ts);
 	}
 }
 
@@ -5532,46 +5532,29 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
 static void
 pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 {
-	int			idx;
-
-	/*
-	 * Get the index of replication slot statistics.  On dropping, we don't
-	 * create the new statistics.
-	 */
-	idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
-
-	/*
-	 * The slot entry is not found or there is no space to accommodate the new
-	 * entry.  This could happen when the message for the creation of a slot
-	 * reached before the drop message even though the actual operations
-	 * happen in reverse order.  In such a case, the next update of the
-	 * statistics for the same slot will create the required entry.
-	 */
-	if (idx < 0)
-		return;
-
-	/* it must be a valid replication slot index */
-	Assert(idx < nReplSlotStats);
-
 	if (msg->m_drop)
 	{
 		/* Remove the replication slot statistics with the given name */
-		if (idx < nReplSlotStats - 1)
-			memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
-				   sizeof(PgStat_ReplSlotStats));
-		nReplSlotStats--;
+		if (replSlotStats != NULL)
+			(void) hash_search(replSlotStats, (void *) NameStr(msg->m_slotname),
+							   HASH_REMOVE, NULL);
 	}
 	else
 	{
+		PgStat_ReplSlotEntry *slotent;
+
+		slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
+		Assert(slotent);
+
 		/* Update the replication slot statistics */
-		replSlotStats[idx].spill_txns += msg->m_spill_txns;
-		replSlotStats[idx].spill_count += msg->m_spill_count;
-		replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
-		replSlotStats[idx].stream_txns += msg->m_stream_txns;
-		replSlotStats[idx].stream_count += msg->m_stream_count;
-		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
-		replSlotStats[idx].total_txns += msg->m_total_txns;
-		replSlotStats[idx].total_bytes += msg->m_total_bytes;
+		slotent->total_txns += msg->m_total_txns;
+		slotent->total_bytes += msg->m_total_bytes;
+		slotent->spill_txns += msg->m_spill_txns;
+		slotent->spill_count += msg->m_spill_count;
+		slotent->spill_bytes += msg->m_spill_bytes;
+		slotent->stream_txns += msg->m_stream_txns;
+		slotent->stream_count += msg->m_stream_count;
+		slotent->stream_bytes += msg->m_stream_bytes;
 	}
 }
 
@@ -5749,59 +5732,78 @@ pgstat_db_requested(Oid databaseid)
 }
 
 /* ----------
- * pgstat_replslot_index
+ * pgstat_get_replslot_entry
  *
- * Return the index of entry of a replication slot with the given name, or
- * -1 if the slot is not found.
+ * Return the entry of replication slot stats with the given name. Return
+ * NULL if not found and the caller didn't request to create it.
  *
  * create_it tells whether to create the new slot entry if it is not found.
  * ----------
  */
-static int
-pgstat_replslot_index(const char *name, bool create_it)
+static PgStat_ReplSlotEntry *
+pgstat_get_replslot_entry(NameData name, bool create_it)
 {
-	int			i;
+	PgStat_ReplSlotEntry *slotent;
+	bool    found;
 
-	Assert(nReplSlotStats <= max_replication_slots);
-	for (i = 0; i < nReplSlotStats; i++)
+	/*
+	 * Create the replication slot stats hash table if we don't have
+	 * it already.
+	 */
+	if (replSlotStats == NULL)
 	{
-		if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
-			return i;			/* found */
+		HASHCTL         hash_ctl;
+
+		hash_ctl.keysize = sizeof(NameData);
+		hash_ctl.entrysize = sizeof(PgStat_ReplSlotEntry);
+		hash_ctl.hcxt = pgStatLocalContext;
+
+		replSlotStats = hash_create("Replication slots hash",
+									PGSTAT_REPLSLOT_HASH_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 	}
 
-	/*
-	 * The slot is not found.  We don't want to register the new statistics if
-	 * the list is already full or the caller didn't request.
-	 */
-	if (i == max_replication_slots || !create_it)
-		return -1;
+	slotent = (PgStat_ReplSlotEntry *) hash_search(replSlotStats,
+												   (void *) &name,
+												   create_it ? HASH_ENTER : HASH_FIND,
+												   &found);
+	if (!slotent)
+	{
+		/* not found */
+		Assert(!create_it && !found);
+		return NULL;
+	}
 
-	/* Register new slot */
-	memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-	namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
+	/* initialize the entry */
+	if (create_it && !found)
+	{
+		memset(slotent, 0, sizeof(PgStat_ReplSlotEntry));
+		namestrcpy(&(slotent->slotname), NameStr(name));
+	}
 
-	return nReplSlotStats++;
+	return slotent;
 }
 
 /* ----------
  * pgstat_reset_replslot
  *
- * Reset the replication slot stats at index 'i'.
+ * Reset the given replication slot stats.
  * ----------
  */
 static void
-pgstat_reset_replslot(int i, TimestampTz ts)
+pgstat_reset_replslot(PgStat_ReplSlotEntry *slotent, TimestampTz ts)
 {
 	/* reset only counters. Don't clear slot name */
-	replSlotStats[i].spill_txns = 0;
-	replSlotStats[i].spill_count = 0;
-	replSlotStats[i].spill_bytes = 0;
-	replSlotStats[i].stream_txns = 0;
-	replSlotStats[i].stream_count = 0;
-	replSlotStats[i].stream_bytes = 0;
-	replSlotStats[i].total_txns = 0;
-	replSlotStats[i].total_bytes = 0;
-	replSlotStats[i].stat_reset_timestamp = ts;
+	slotent->spill_txns = 0;
+	slotent->spill_count = 0;
+	slotent->spill_bytes = 0;
+	slotent->stream_txns = 0;
+	slotent->stream_count = 0;
+	slotent->stream_bytes = 0;
+	slotent->total_txns = 0;
+	slotent->total_bytes = 0;
+	slotent->stat_reset_timestamp = ts;
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ed9a7c8489..73ef806530 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1773,7 +1773,7 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
-	PgStat_ReplSlotStats repSlotStat;
+	PgStat_ReplSlotEntry repSlotStat;
 
 	/*
 	 * Nothing to do if we don't have any replication stats to be sent.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f61b163f78..f75e7e95f9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -329,8 +329,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 */
 	if (SlotIsLogical(slot))
 	{
-		PgStat_ReplSlotStats repSlotStat;
-		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
+		PgStat_ReplSlotEntry repSlotStat;
+		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotEntry));
 		namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
 		pgstat_report_replslot(&repSlotStat);
 	}
@@ -349,17 +349,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  * Search for the named replication slot.
  *
  * Return the replication slot if found, otherwise NULL.
- *
- * The caller must hold ReplicationSlotControlLock in shared mode.
  */
 ReplicationSlot *
-SearchNamedReplicationSlot(const char *name)
+SearchNamedReplicationSlot(const char *name, bool need_lock)
 {
 	int			i;
 	ReplicationSlot	*slot = NULL;
 
-	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
-								LW_SHARED));
+	if (need_lock)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -372,6 +370,9 @@ SearchNamedReplicationSlot(const char *name)
 		}
 	}
 
+	if (need_lock)
+		LWLockRelease(ReplicationSlotControlLock);
+
 	return slot;
 }
 
@@ -416,7 +417,7 @@ retry:
 	 * Search for the slot with the specified name if the slot to acquire is
 	 * not given. If the slot is not found, we either return -1 or error out.
 	 */
-	s = slot ? slot : SearchNamedReplicationSlot(name);
+	s = slot ? slot : SearchNamedReplicationSlot(name, false);
 	if (s == NULL || !s->in_use)
 	{
 		LWLockRelease(ReplicationSlotControlLock);
@@ -712,7 +713,11 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * and create messages while holding ReplicationSlotAllocationLock to
 	 * reduce that possibility. If the messages reached in reverse, we would
 	 * lose one statistics update message. But the next update message will
-	 * create the statistics for the replication slot.
+	 * create the statistics for the replication slot. In case where the
+	 * message for dropping the old slot gets lost and a slot with the same is
+	 * created, the stats will be accumulated into the old slots since we
+	 * use the slot name as the key. In that case, user can reset the particular
+	 * stats by pg_stat_reset_replication_slot().
 	 */
 	if (SlotIsLogical(slot))
 		pgstat_report_replslot_drop(NameStr(slot->data.name));
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 2680190a40..b1899d8c68 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -2207,8 +2208,32 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
 	char	   *target = NULL;
 
 	if (!PG_ARGISNULL(0))
+	{
+		ReplicationSlot *slot;
+
 		target = text_to_cstring(PG_GETARG_TEXT_PP(0));
 
+		/*
+		 * Check if the slot exists with the given name. It is possible that by
+		 * the time this message is executed the slot is dropped but at least
+		 * this check will ensure that the given name is for a valid slot.
+		 */
+		slot = SearchNamedReplicationSlot(target, true);
+
+		if (!slot)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("replication slot \"%s\" does not exist",
+							target)));
+
+		/*
+		 * Nothing to do for physical slots as we collect stats only for
+		 * logical slots.
+		 */
+		if (SlotIsPhysical(slot))
+			PG_RETURN_VOID();
+	}
+
 	pgstat_reset_replslot_counter(target);
 
 	PG_RETURN_VOID();
@@ -2280,73 +2305,67 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
 
-/* Get the statistics for the replication slots */
+/* Get the statistics for the replication slot */
 Datum
-pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
 #define PG_STAT_GET_REPLICATION_SLOT_COLS 10
-	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	text *slotname_text = PG_GETARG_TEXT_P(0);
+	NameData slotname;
 	TupleDesc	tupdesc;
-	Tuplestorestate *tupstore;
-	MemoryContext per_query_ctx;
-	MemoryContext oldcontext;
-	PgStat_ReplSlotStats *slotstats;
-	int			nstats;
-	int			i;
-
-	/* check to see if caller supports us returning a tuplestore */
-	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("set-valued function called in context that cannot accept a set")));
-	if (!(rsinfo->allowedModes & SFRM_Materialize))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("materialize mode required, but it is not allowed in this context")));
-
-	/* Build a tuple descriptor for our result type */
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
-	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
-	oldcontext = MemoryContextSwitchTo(per_query_ctx);
-
-	tupstore = tuplestore_begin_heap(true, false, work_mem);
-	rsinfo->returnMode = SFRM_Materialize;
-	rsinfo->setResult = tupstore;
-	rsinfo->setDesc = tupdesc;
+	Datum		values[10];
+	bool		nulls[10];
+	PgStat_ReplSlotEntry *slotent;
 
-	MemoryContextSwitchTo(oldcontext);
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
 
-	slotstats = pgstat_fetch_replslot(&nstats);
-	for (i = 0; i < nstats; i++)
-	{
-		Datum		values[PG_STAT_GET_REPLICATION_SLOT_COLS];
-		bool		nulls[PG_STAT_GET_REPLICATION_SLOT_COLS];
-		PgStat_ReplSlotStats *s = &(slotstats[i]);
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
 
-		MemSet(values, 0, sizeof(values));
-		MemSet(nulls, 0, sizeof(nulls));
+	namestrcpy(&slotname, text_to_cstring(slotname_text));
+	slotent = pgstat_fetch_replslot(slotname);
 
-		values[0] = CStringGetTextDatum(NameStr(s->slotname));
-		values[1] = Int64GetDatum(s->spill_txns);
-		values[2] = Int64GetDatum(s->spill_count);
-		values[3] = Int64GetDatum(s->spill_bytes);
-		values[4] = Int64GetDatum(s->stream_txns);
-		values[5] = Int64GetDatum(s->stream_count);
-		values[6] = Int64GetDatum(s->stream_bytes);
-		values[7] = Int64GetDatum(s->total_txns);
-		values[8] = Int64GetDatum(s->total_bytes);
-
-		if (s->stat_reset_timestamp == 0)
-			nulls[9] = true;
-		else
-			values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
+	if (!slotent)
+		PG_RETURN_NULL();
 
-		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
-	}
+	values[0] = CStringGetTextDatum(NameStr(slotent->slotname));
+	values[1] = Int64GetDatum(slotent->spill_txns);
+	values[2] = Int64GetDatum(slotent->spill_count);
+	values[3] = Int64GetDatum(slotent->spill_bytes);
+	values[4] = Int64GetDatum(slotent->stream_txns);
+	values[5] = Int64GetDatum(slotent->stream_count);
+	values[6] = Int64GetDatum(slotent->stream_bytes);
+	values[7] = Int64GetDatum(slotent->total_txns);
+	values[8] = Int64GetDatum(slotent->total_bytes);
 
-	tuplestore_donestoring(tupstore);
+	if (slotent->stat_reset_timestamp == 0)
+		nulls[9] = true;
+	else
+		values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
-	return (Datum) 0;
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 591753fe81..a37cb8a426 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5311,14 +5311,14 @@
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
   prosrc => 'pg_stat_get_wal_receiver' },
-{ oid => '8595', descr => 'statistics: information about replication slots',
-  proname => 'pg_stat_get_replication_slots', prorows => '10',
+{ oid => '8595', descr => 'statistics: information about replication slot',
+  proname => 'pg_stat_get_replication_slot', prorows => '1',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
-  prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
-  prosrc => 'pg_stat_get_replication_slots' },
+  prorettype => 'record', proargtypes => 'text',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+  prosrc => 'pg_stat_get_replication_slot' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 2aeb3cded4..e9e16e5e2d 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -917,7 +917,7 @@ typedef struct PgStat_SLRUStats
 /*
  * Replication slot statistics kept in the stats collector
  */
-typedef struct PgStat_ReplSlotStats
+typedef struct PgStat_ReplSlotEntry
 {
 	NameData	slotname;
 	PgStat_Counter spill_txns;
@@ -929,7 +929,7 @@ typedef struct PgStat_ReplSlotStats
 	PgStat_Counter total_txns;
 	PgStat_Counter total_bytes;
 	TimestampTz stat_reset_timestamp;
-} PgStat_ReplSlotStats;
+} PgStat_ReplSlotEntry;
 
 
 /*
@@ -1031,7 +1031,7 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
+extern void pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
@@ -1129,7 +1129,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
-extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
+extern PgStat_ReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
 extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1ad5e6c50d..357068403a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
-extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6399f3feef..ce2ecb914b 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2071,7 +2071,9 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.total_txns,
     s.total_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
+   FROM pg_replication_slots r,
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+  WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
-- 
2.25.1

From 6bcc1b29c7adffaf2b613ab5cc7ba3d3d9fc85a4 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Mon, 12 Apr 2021 16:10:11 +0530
Subject: [PATCH v10 4/4] Test where there are more replication slot statistics
 than max_replication_slot slots at startup.

There is a remote scenario where one of the replication slots is dropped and
the drop slot statistics message is not received by the statistic collector
process, now if the max_replication_slots is reduced to the actual number of
replication slots that are in use and the server is re-started then the
statistics process will not be aware of this and the statistic collector
process will write beyond the slots available, added a test for this.
---
 contrib/test_decoding/t/001_repl_stats.pl | 30 +++++++++++++++++++++--
 1 file changed, 28 insertions(+), 2 deletions(-)

diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 11b6cd9b9c..5ca4d3d7a7 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -2,9 +2,10 @@
 # drop replication slot and restart.
 use strict;
 use warnings;
+use File::Path qw(rmtree);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 1;
+use Test::More tests => 2;
 
 # Test set-up
 my $node = get_new_node('test');
@@ -66,11 +67,36 @@ is($result, qq(regression_slot1|t|t
 regression_slot2|t|t
 regression_slot3|t|t), 'check replication statistics are updated');
 
+# Test to remove one of the replication slots and adjust
+# max_replication_slots accordingly to the number of slots. This leads
+# to a mismatch between the number of slots present in the stats file and the
+# number of stats present in the shared memory, simulating the scenario for
+# drop slot message lost by the statistics collector process. We verify
+# replication statistics data is fine after restart.
+
+$node->stop;
+my $datadir = $node->data_dir;
+my $slot3_replslotdir = "$datadir/pg_replslot/regression_slot3";
+
+rmtree($slot3_replslotdir);
+
+$node->append_conf('postgresql.conf', 'max_replication_slots = 2');
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+$result = $node->safe_psql('postgres',
+	"SELECT slot_name, total_txns > 0 AS total_txn,
+	total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
+	ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t), 'check replication statistics are updated');
+
 # cleanup
 $node->safe_psql('postgres', "DROP TABLE test_repl_stat");
 $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
 $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
-$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
 
 # shutdown
 $node->stop;
-- 
2.25.1

Reply via email to