On Sat, Apr 10, 2021 at 9:50 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Fri, Apr 9, 2021 at 4:13 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
> > 2.
> > @@ -2051,6 +2054,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> > ReorderBufferTXN *txn,
> >   rb->begin(rb, txn);
> >   }
> >
> > + /*
> > + * Update total transaction count and total transaction bytes, if
> > + * transaction is streamed or spilled it will be updated while the
> > + * transaction gets spilled or streamed.
> > + */
> > + if (!rb->streamBytes && !rb->spillBytes)
> > + {
> > + rb->totalTxns++;
> > + rb->totalBytes += rb->size;
> > + }
> >
> > I think this will skip a transaction if it is interleaved between a
> > streaming transaction. Assume, two transactions t1 and t2. t1 sends
> > changes in multiple streams and t2 sends all changes in one go at
> > commit time. So, now, if t2 is interleaved between multiple streams
> > then I think the above won't count t2.
> >
> > 3.
> > @@ -3524,9 +3538,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb,
> > ReorderBufferTXN *txn)
> >   {
> >   rb->spillCount += 1;
> >   rb->spillBytes += size;
> > + rb->totalBytes += size;
> >
> >   /* don't consider already serialized transactions */
> >   rb->spillTxns += (rbtxn_is_serialized(txn) ||
> > rbtxn_is_serialized_clear(txn)) ? 0 : 1;
> > + rb->totalTxns += (rbtxn_is_serialized(txn) ||
> > rbtxn_is_serialized_clear(txn)) ? 0 : 1;
> >   }
> >
> > We do serialize each subtransaction separately. So totalTxns will
> > include subtransaction count as well when serialized, otherwise not.
> > The description of totalTxns also says that it doesn't include
> > subtransactions. So, I think updating rb->totalTxns here is wrong.
> >
>
> The attached patch should fix the above two comments. I think it
> should be sufficient if we just update the stats after processing the
> TXN. We need to ensure that don't count streamed transactions multiple
> times. I have not tested the attached, can you please review/test it
> and include it in the next set of patches if you agree with this
> change.

Thanks Amit for your Patch. I have merged your changes into my
patchset. I did not find any issues in my testing.
Thoughts?

Regards,
Vignesh
From bb176d2399f4f550ceb0b733ee23d513cd835db9 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Sat, 10 Apr 2021 08:14:52 +0530
Subject: [PATCH v5 1/5] Changed char datatype to NameData datatype for
 slotname.

Changed char datatype to NameData datatype for slotname.
---
 src/backend/postmaster/pgstat.c           | 32 +++++++++++------------
 src/backend/replication/logical/logical.c | 13 ++++++---
 src/backend/replication/slot.c            |  7 ++++-
 src/backend/utils/adt/pgstatfuncs.c       |  2 +-
 src/include/pgstat.h                      | 15 ++++++-----
 5 files changed, 40 insertions(+), 29 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f4467625f7..1becff09d0 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -64,6 +64,7 @@
 #include "storage/pg_shmem.h"
 #include "storage/proc.h"
 #include "storage/procsignal.h"
+#include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -1539,7 +1540,7 @@ pgstat_reset_replslot_counter(const char *name)
 		if (SlotIsPhysical(slot))
 			return;
 
-		strlcpy(msg.m_slotname, name, NAMEDATALEN);
+		namestrcpy(&msg.m_slotname, name);
 		msg.clearall = false;
 	}
 	else
@@ -1812,10 +1813,7 @@ pgstat_report_tempfile(size_t filesize)
  * ----------
  */
 void
-pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
-					   PgStat_Counter spillcount, PgStat_Counter spillbytes,
-					   PgStat_Counter streamtxns, PgStat_Counter streamcount,
-					   PgStat_Counter streambytes)
+pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -1823,14 +1821,14 @@ pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
 	 * Prepare and send the message
 	 */
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
-	strlcpy(msg.m_slotname, slotname, NAMEDATALEN);
+	namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
 	msg.m_drop = false;
-	msg.m_spill_txns = spilltxns;
-	msg.m_spill_count = spillcount;
-	msg.m_spill_bytes = spillbytes;
-	msg.m_stream_txns = streamtxns;
-	msg.m_stream_count = streamcount;
-	msg.m_stream_bytes = streambytes;
+	msg.m_spill_txns = repSlotStat->spill_txns;
+	msg.m_spill_count = repSlotStat->spill_count;;
+	msg.m_spill_bytes = repSlotStat->spill_bytes;
+	msg.m_stream_txns = repSlotStat->stream_txns;
+	msg.m_stream_count = repSlotStat->stream_count;
+	msg.m_stream_bytes = repSlotStat->stream_bytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -1846,7 +1844,7 @@ pgstat_report_replslot_drop(const char *slotname)
 	PgStat_MsgReplSlot msg;
 
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
-	strlcpy(msg.m_slotname, slotname, NAMEDATALEN);
+	namestrcpy(&msg.m_slotname, slotname);
 	msg.m_drop = true;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
@@ -5202,7 +5200,7 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 	else
 	{
 		/* Get the index of replication slot statistics to reset */
-		idx = pgstat_replslot_index(msg->m_slotname, false);
+		idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
 
 		/*
 		 * Nothing to do if the given slot entry is not found.  This could
@@ -5538,7 +5536,7 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 	 * Get the index of replication slot statistics.  On dropping, we don't
 	 * create the new statistics.
 	 */
-	idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
+	idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
 
 	/*
 	 * The slot entry is not found or there is no space to accommodate the new
@@ -5763,7 +5761,7 @@ pgstat_replslot_index(const char *name, bool create_it)
 	Assert(nReplSlotStats <= max_replication_slots);
 	for (i = 0; i < nReplSlotStats; i++)
 	{
-		if (strcmp(replSlotStats[i].slotname, name) == 0)
+		if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
 			return i;			/* found */
 	}
 
@@ -5776,7 +5774,7 @@ pgstat_replslot_index(const char *name, bool create_it)
 
 	/* Register new slot */
 	memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-	strlcpy(replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+	namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
 
 	return nReplSlotStats++;
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 4f6e87f18d..68e210ce12 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1773,6 +1773,7 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
+	PgStat_ReplSlotStats repSlotStat;
 
 	/*
 	 * Nothing to do if we haven't spilled or streamed anything since the last
@@ -1790,9 +1791,15 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 (long long) rb->streamCount,
 		 (long long) rb->streamBytes);
 
-	pgstat_report_replslot(NameStr(ctx->slot->data.name),
-						   rb->spillTxns, rb->spillCount, rb->spillBytes,
-						   rb->streamTxns, rb->streamCount, rb->streamBytes);
+	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
+	repSlotStat.spill_txns = rb->spillTxns;
+	repSlotStat.spill_count = rb->spillCount;
+	repSlotStat.spill_bytes = rb->spillBytes;
+	repSlotStat.stream_txns = rb->streamTxns;
+	repSlotStat.stream_count = rb->streamCount;
+	repSlotStat.stream_bytes = rb->streamBytes;
+
+	pgstat_report_replslot(&repSlotStat);
 	rb->spillTxns = 0;
 	rb->spillCount = 0;
 	rb->spillBytes = 0;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 75a087c2f9..f61b163f78 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -328,7 +328,12 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * ReplicationSlotAllocationLock.
 	 */
 	if (SlotIsLogical(slot))
-		pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
+	{
+		PgStat_ReplSlotStats repSlotStat;
+		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
+		namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
+		pgstat_report_replslot(&repSlotStat);
+	}
 
 	/*
 	 * Now that the slot has been marked as in_use and active, it's safe to
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 182b15e3f2..521ba73614 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2328,7 +2328,7 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 		MemSet(values, 0, sizeof(values));
 		MemSet(nulls, 0, sizeof(nulls));
 
-		values[0] = PointerGetDatum(cstring_to_text(s->slotname));
+		values[0] = CStringGetTextDatum(NameStr(s->slotname));
 		values[1] = Int64GetDatum(s->spill_txns);
 		values[2] = Int64GetDatum(s->spill_count);
 		values[3] = Int64GetDatum(s->spill_bytes);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 9a87e7cd88..2aeb3cded4 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -393,7 +393,7 @@ typedef struct PgStat_MsgResetslrucounter
 typedef struct PgStat_MsgResetreplslotcounter
 {
 	PgStat_MsgHdr m_hdr;
-	char		m_slotname[NAMEDATALEN];
+	NameData      m_slotname;
 	bool		clearall;
 } PgStat_MsgResetreplslotcounter;
 
@@ -540,7 +540,7 @@ typedef struct PgStat_MsgSLRU
 typedef struct PgStat_MsgReplSlot
 {
 	PgStat_MsgHdr m_hdr;
-	char		m_slotname[NAMEDATALEN];
+	NameData	m_slotname;
 	bool		m_drop;
 	PgStat_Counter m_spill_txns;
 	PgStat_Counter m_spill_count;
@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_stream_txns;
 	PgStat_Counter m_stream_count;
 	PgStat_Counter m_stream_bytes;
+	PgStat_Counter m_total_txns;
+	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
 /* ----------
@@ -917,13 +919,15 @@ typedef struct PgStat_SLRUStats
  */
 typedef struct PgStat_ReplSlotStats
 {
-	char		slotname[NAMEDATALEN];
+	NameData	slotname;
 	PgStat_Counter spill_txns;
 	PgStat_Counter spill_count;
 	PgStat_Counter spill_bytes;
 	PgStat_Counter stream_txns;
 	PgStat_Counter stream_count;
 	PgStat_Counter stream_bytes;
+	PgStat_Counter total_txns;
+	PgStat_Counter total_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_ReplSlotStats;
 
@@ -1027,10 +1031,7 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
-								   PgStat_Counter spillcount, PgStat_Counter spillbytes,
-								   PgStat_Counter streamtxns, PgStat_Counter streamcount,
-								   PgStat_Counter streambytes);
+extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
-- 
2.25.1

From cf6fbb7a582f89390acc8c003269435ddefc4ce7 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Sat, 10 Apr 2021 08:57:05 +0530
Subject: [PATCH v5 2/5] Added total txns and total txn bytes to replication
 statistics.

This adds the statistics about total transactions count and total transaction
data logically replicated to the decoding output plugin from ReorderBuffer.
Users can query the pg_stat_replication_slots view to check these stats.
---
 contrib/test_decoding/expected/stats.out      | 79 +++++++++++++------
 contrib/test_decoding/sql/stats.sql           | 48 +++++++----
 doc/src/sgml/monitoring.sgml                  | 23 ++++++
 src/backend/catalog/system_views.sql          |  2 +
 src/backend/postmaster/pgstat.c               |  6 ++
 src/backend/replication/logical/logical.c     | 16 ++--
 .../replication/logical/reorderbuffer.c       | 12 +++
 src/backend/utils/adt/pgstatfuncs.c           |  8 +-
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/replication/reorderbuffer.h       |  4 +
 src/test/regress/expected/rules.out           |  4 +-
 11 files changed, 159 insertions(+), 49 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index bca36fa903..bc8e601eab 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 
 CREATE TABLE stats_test(data text);
 -- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
 DECLARE
   start_time timestamptz := clock_timestamp();
   updated bool;
@@ -16,12 +16,25 @@ BEGIN
   -- we don't want to wait forever; loop will exit after 30 seconds
   FOR i IN 1 .. 300 LOOP
 
-    -- check to see if all updates have been reset/updated
-    SELECT CASE WHEN check_reset THEN (spill_txns = 0)
-                ELSE (spill_txns > 0)
-           END
-    INTO updated
-    FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+    IF check_spill_txns THEN
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+                  ELSE (spill_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    ELSE
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (total_txns = 0)
+                  ELSE (total_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    END IF;
 
     exit WHEN updated;
 
@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
 -- Check stats, wait for the stats collector to update. We can't test the
 -- exact stats count as that can vary if any background transaction (say by
 -- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot | t          | t
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t          | t           | t          | t
 (1 row)
 
 -- reset the slot stats, and wait for stats collector to reset
@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot');
  
 (1 row)
 
-SELECT wait_for_decode_stats(true);
+SELECT wait_for_decode_stats(true, true);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot |          0 |           0
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot |          0 |           0 |          0 |           0
 (1 row)
 
 -- decode and check stats again.
@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
   5002
 (1 row)
 
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
+ wait_for_decode_stats 
+-----------------------
+ 
+(1 row)
+
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t          | t           | t          | t
+(1 row)
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+ pg_stat_reset_replication_slot 
+--------------------------------
+ 
+(1 row)
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | f          | f           | t          | t
 (1 row)
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots;
 (1 row)
 
 COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 51294e48e8..8c34aeced1 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 CREATE TABLE stats_test(data text);
 
 -- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
 DECLARE
   start_time timestamptz := clock_timestamp();
   updated bool;
@@ -14,12 +14,25 @@ BEGIN
   -- we don't want to wait forever; loop will exit after 30 seconds
   FOR i IN 1 .. 300 LOOP
 
-    -- check to see if all updates have been reset/updated
-    SELECT CASE WHEN check_reset THEN (spill_txns = 0)
-                ELSE (spill_txns > 0)
-           END
-    INTO updated
-    FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+    IF check_spill_txns THEN
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+                  ELSE (spill_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    ELSE
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (total_txns = 0)
+                  ELSE (total_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    END IF;
 
     exit WHEN updated;
 
@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
 -- Check stats, wait for the stats collector to update. We can't test the
 -- exact stats count as that can vary if any background transaction (say by
 -- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
 
 -- reset the slot stats, and wait for stats collector to reset
 SELECT pg_stat_reset_replication_slot('regression_slot');
-SELECT wait_for_decode_stats(true);
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(true, true);
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
 
 -- decode and check stats again.
 SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots;
 SELECT slot_name FROM pg_stat_replication_slots;
 COMMIT;
 
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 8287587f61..d5e2012b38 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2716,6 +2716,29 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent to the decoding output plugin for
+        this slot. This counter is used to maintain the top level transactions,
+        so the counter is not incremented for subtransactions.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded transactions data sent to the decoding output plugin
+        while decoding the changes from WAL for this slot. This can be used to
+        gauge the total amount of data sent during logical decoding.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 451db2ee0a..6d78b33590 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_txns,
             s.stream_count,
             s.stream_bytes,
+            s.total_txns,
+            s.total_bytes,
             s.stats_reset
     FROM pg_stat_get_replication_slots() AS s;
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 1becff09d0..e5b1fb045e 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
 	msg.m_stream_txns = repSlotStat->stream_txns;
 	msg.m_stream_count = repSlotStat->stream_count;
 	msg.m_stream_bytes = repSlotStat->stream_bytes;
+	msg.m_total_txns = repSlotStat->total_txns;
+	msg.m_total_bytes = repSlotStat->total_bytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 		replSlotStats[idx].stream_txns += msg->m_stream_txns;
 		replSlotStats[idx].stream_count += msg->m_stream_count;
 		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+		replSlotStats[idx].total_txns += msg->m_total_txns;
+		replSlotStats[idx].total_bytes += msg->m_total_bytes;
 	}
 }
 
@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
 	replSlotStats[i].stream_txns = 0;
 	replSlotStats[i].stream_count = 0;
 	replSlotStats[i].stream_bytes = 0;
+	replSlotStats[i].total_txns = 0;
+	replSlotStats[i].total_bytes = 0;
 	replSlotStats[i].stat_reset_timestamp = ts;
 }
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 68e210ce12..ed9a7c8489 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1776,20 +1776,22 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	PgStat_ReplSlotStats repSlotStat;
 
 	/*
-	 * Nothing to do if we haven't spilled or streamed anything since the last
-	 * time the stats has been sent.
+	 * Nothing to do if we don't have any replication stats to be sent.
 	 */
-	if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 &&
+		rb->totalBytes <= 0 && rb->totalTxns <=0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
 		 rb,
 		 (long long) rb->spillTxns,
 		 (long long) rb->spillCount,
 		 (long long) rb->spillBytes,
 		 (long long) rb->streamTxns,
 		 (long long) rb->streamCount,
-		 (long long) rb->streamBytes);
+		 (long long) rb->streamBytes,
+		 (long long) rb->totalTxns,
+		 (long long) rb->totalBytes);
 
 	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
 	repSlotStat.spill_txns = rb->spillTxns;
@@ -1798,6 +1800,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.stream_txns = rb->streamTxns;
 	repSlotStat.stream_count = rb->streamCount;
 	repSlotStat.stream_bytes = rb->streamBytes;
+	repSlotStat.total_txns = rb->totalTxns;
+	repSlotStat.total_bytes = rb->totalBytes;
 
 	pgstat_report_replslot(&repSlotStat);
 	rb->spillTxns = 0;
@@ -1806,4 +1810,6 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->streamTxns = 0;
 	rb->streamCount = 0;
 	rb->streamBytes = 0;
+	rb->totalTxns = 0;
+	rb->totalBytes = 0;
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 52d06285a2..bc251adfda 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
 	buffer->streamTxns = 0;
 	buffer->streamCount = 0;
 	buffer->streamBytes = 0;
+	buffer->totalTxns = 0;
+	buffer->totalBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -2359,6 +2361,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			specinsert = NULL;
 		}
 
+		/*
+		 * Update total transaction count and total transaction bytes
+		 * processed. Ensure to not count the streamed transaction multiple
+		 * times.
+		 */
+		if (!rbtxn_is_streamed(txn))
+			rb->totalTxns++;
+
+		rb->totalBytes += rb->size;
+
 		/* clean up the iterator */
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 521ba73614..2680190a40 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 		values[4] = Int64GetDatum(s->stream_txns);
 		values[5] = Int64GetDatum(s->stream_count);
 		values[6] = Int64GetDatum(s->stream_bytes);
+		values[7] = Int64GetDatum(s->total_txns);
+		values[8] = Int64GetDatum(s->total_bytes);
 
 		if (s->stat_reset_timestamp == 0)
-			nulls[7] = true;
+			nulls[9] = true;
 		else
-			values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+			values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f4957653ae..591753fe81 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5315,9 +5315,9 @@
   proname => 'pg_stat_get_replication_slots', prorows => '10',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6a..a372b70b7d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,10 @@ struct ReorderBuffer
 	int64		streamTxns;		/* number of transactions streamed */
 	int64		streamCount;	/* streaming invocation counter */
 	int64		streamBytes;	/* amount of data streamed */
+
+	/* Statistics about all the replicated transactions */
+	int64		totalTxns;		/* total number of transactions replicated */
+	int64		totalBytes;		/* total amount of data replicated */
 };
 
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 186e6c966c..6399f3feef 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2068,8 +2068,10 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_txns,
     s.stream_count,
     s.stream_bytes,
+    s.total_txns,
+    s.total_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
-- 
2.25.1

From d971aa9ec544ce6614b8b369dbb342ed132a06a8 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Mon, 5 Apr 2021 18:17:21 +0530
Subject: [PATCH v5 3/5] Added tests for verification of logical replication
 statistics.

Added tests for verification of logical replication statistics after
restart of server.
---
 contrib/test_decoding/Makefile            |   2 +
 contrib/test_decoding/t/001_repl_stats.pl | 111 ++++++++++++++++++++++
 2 files changed, 113 insertions(+)
 create mode 100644 contrib/test_decoding/t/001_repl_stats.pl

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c5e28ce5cc..9a31e0b879 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 # typical installcheck users do not have (e.g. buildfarm clients).
 NO_INSTALLCHECK = 1
 
+TAP_TESTS = 1
+
 ifdef USE_PGXS
 PG_CONFIG = pg_config
 PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
new file mode 100644
index 0000000000..194f764d18
--- /dev/null
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -0,0 +1,111 @@
+# Test replication statistics data in pg_stat_replication_slots is sane after
+# drop replication slot and restart.
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Test set-up
+my $node = get_new_node('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'synchronous_commit = on');
+$node->start;
+
+$node->safe_psql('postgres', q(CREATE FUNCTION wait_for_decode_stats(check_slot_name text) RETURNS void AS $$
+DECLARE
+  start_time timestamptz := clock_timestamp();
+  txn_count bool;
+BEGIN
+  -- we don't want to wait forever; loop will exit after 30 seconds
+  FOR i IN 1 .. 300 LOOP
+
+    -- check to see if all updates have been reset/updated
+    SELECT (total_txns > 0) INTO txn_count
+    FROM pg_stat_replication_slots WHERE slot_name=check_slot_name;
+
+    exit WHEN txn_count;
+
+    -- wait a little
+    perform pg_sleep_for('100 milliseconds');
+
+    -- reset stats snapshot so we can test again
+    perform pg_stat_clear_snapshot();
+
+  END LOOP;
+
+  -- report time waited in postmaster log (where it won't change test output)
+  RAISE LOG 'wait_for_decode_stats delayed % seconds',
+    extract(epoch from clock_timestamp() - start_time);
+END
+$$ LANGUAGE plpgsql;
+));
+
+# Create table.
+$node->safe_psql('postgres',
+        "CREATE TABLE test_repl_stat(col1 int)");
+
+# Create replication slots.
+$node->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding')");
+$node->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding')");
+$node->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot3', 'test_decoding')");
+$node->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot4', 'test_decoding')");
+
+# Insert some data.
+$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
+
+$node->safe_psql('postgres',
+	"SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')");
+$node->safe_psql('postgres',
+	"SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')");
+$node->safe_psql('postgres',
+        "SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')");
+$node->safe_psql('postgres',
+	"SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')");
+
+# Wait for the statistics to be updated.
+$node->safe_psql('postgres', "SELECT wait_for_decode_stats('regression_slot1')");
+$node->safe_psql('postgres', "SELECT wait_for_decode_stats('regression_slot2')");
+$node->safe_psql('postgres', "SELECT wait_for_decode_stats('regression_slot3')");
+$node->safe_psql('postgres', "SELECT wait_for_decode_stats('regression_slot4')");
+
+# Test to verify replication statistics data is updated in
+# pg_stat_replication_slots statistics view.
+my $result = $node->safe_psql('postgres',
+	"SELECT slot_name, total_txns > 0 AS total_txn, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t
+regression_slot4|t|t), 'check replication statistics are updated');
+
+# Test to drop one of the replication slot and verify replication statistics data is
+# fine after restart.
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
+
+$node->stop;
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+$result = $node->safe_psql('postgres',
+	"SELECT slot_name, total_txns > 0 AS total_txn, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t), 'check replication statistics are updated');
+
+# cleanup
+$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
+$node->safe_psql('postgres', "DROP FUNCTION wait_for_decode_stats(TEXT)");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
+
+# shutdown
+$node->stop;
-- 
2.25.1

From 51b4c60e7c47947bc08b72c822c8f37552aa7ad6 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Sat, 10 Apr 2021 11:33:01 +0530
Subject: [PATCH v5 4/5] Handle overwriting of replication slot statistic
 issue.

There is a remote scenario where one of the replication slots is dropped and
the drop slot statistics message is not received by the statistic collector
process, now if the max_replication_slots is reduced to the actual number of
replication slots that are in use and the server is re-started then the
statistics process will not be aware of this and the statistic collector
process will write beyond the slots available, fixed it by writing the
replication slot count in statistics file and increasing
max_replication_slots if required at statup time base on the value
updated in the statistics file.
---
 src/backend/postmaster/pgstat.c | 36 +++++++++++++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e5b1fb045e..d7923bfde4 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3741,6 +3741,13 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 		(void) rc;				/* we'll check for error with ferror */
 	}
 
+	if (nReplSlotStats > 0)
+	{
+		fputc('N', fpout);
+		rc = fwrite(&nReplSlotStats, sizeof(nReplSlotStats), 1, fpout);
+		(void) rc;				/* we'll check for error with ferror */
+	}
+
 	/*
 	 * Write replication slot stats struct
 	 */
@@ -3960,6 +3967,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	bool		found;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
 	int			i;
+	int			replslotcount;
 
 	/*
 	 * The tables will live in pgStatLocalContext.
@@ -4196,6 +4204,34 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 
 				break;
 
+				/*
+				 * 'N'	Indicates the number of replication slot statistics
+				 * present.
+				 */
+			case 'N':
+				if (fread(&replslotcount, 1, sizeof(replslotcount), fpin)
+					!=sizeof(replslotcount))
+				{
+					ereport(pgStatRunningInCollector ? LOG : WARNING,
+							(errmsg("corrupted statistics file \"%s\"",
+									statfile)));
+					goto done;
+				}
+
+				if (replslotcount > max_replication_slots)
+				{
+					max_replication_slots = replslotcount;
+					replSlotStats = repalloc(replSlotStats,
+											 max_replication_slots
+											 * sizeof(PgStat_ReplSlotStats));
+					/*
+					 * Set the same reset timestamp for all replication slots too.
+					 */
+					for (i = 0; i < max_replication_slots; i++)
+						replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
+				}
+				break;
+
 				/*
 				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
 				 * slot follows.
-- 
2.25.1

From 03250a6bdeb93979b069ab0ab063cd6afbc55118 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Wed, 7 Apr 2021 13:07:26 +0530
Subject: [PATCH v5 5/5] Test where there are more replication slot statistics
 that max_replication_slot slots at startup.

There is a remote scenario where one of the replication slots is dropped and
the drop slot statistics message is not received by the statistic collector
process, now if the max_replication_slots is reduced to the actual number of
replication slots that are in use and the server is re-started then the
statistics process will not be aware of this and the statistic collector
process will write beyond the slots available, added a test for this.
---
 contrib/test_decoding/t/001_repl_stats.pl | 24 +++++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 194f764d18..932acee380 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -5,7 +5,7 @@ use warnings;
 use File::Path qw(rmtree);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 2;
+use Test::More tests => 3;
 
 # Test set-up
 my $node = get_new_node('test');
@@ -100,12 +100,32 @@ is($result, qq(regression_slot1|t|t
 regression_slot2|t|t
 regression_slot3|t|t), 'check replication statistics are updated');
 
+# Test to remove one of the replication slots and adjust max_replication_slots
+# accordingly to the number of slots and verify replication statistics data is
+# fine after restart.
+$node->stop;
+my $datadir = $node->data_dir;
+my $slot3_replslotdir = "$datadir/pg_replslot/regression_slot3";
+
+rmtree($slot3_replslotdir);
+
+$node->append_conf('postgresql.conf', 'max_replication_slots = 2');
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+$result = $node->safe_psql('postgres',
+	"SELECT slot_name, total_txns > 0 AS total_txn, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t), 'check replication statistics are updated');
+
 # cleanup
 $node->safe_psql('postgres', "DROP TABLE test_repl_stat");
 $node->safe_psql('postgres', "DROP FUNCTION wait_for_decode_stats(TEXT)");
 $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
 $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
-$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
 
 # shutdown
 $node->stop;
-- 
2.25.1

Reply via email to