On Mon, Oct 27, 2025 at 4:47 PM shveta malik <[email protected]> wrote:
>
> Few comments:
>
> 1)
> pgoutput_truncate:
>
> if (nrelids > 0)
> {
> OutputPluginPrepareWrite(ctx, true);
> logicalrep_write_truncate(ctx->out,
>   xid,
>   nrelids,
>   relids,
>   change->data.truncate.cascade,
>   change->data.truncate.restart_seqs);
> OutputPluginWrite(ctx, true);
> }
> + else
> + ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
> +
>
> It seems that filteredBytes are only counted for TRUNCATE when nrelids
> is 0. Can nrelids only be 0 or same as nrelations?
>
> The below code makes me think that nrelids can be any number between 0
> and nrelations, depending on which relations are publishable and which
> supports publishing TRUNCATE. If that’s true, shouldn’t we count
> filteredBytes in each such skipped case?

IIIUC, you are suggesting that we should add
ReorderBufferChangeSize(change) for every relation which is not part
of the publication or whose truncate is not published. I think that
won't be correct since it can lead to a situation where filtered bytes
> total bytes which should never happen. Even if there is a single
publishable relation whose truncate is published, the change should
not be considered as filtered since something would be output
downstream. Otherwise filtered bytes as well as sent bytes both will
be incremented causing an inconsistency (which would be hard to notice
since total bytes - filtered bytes has something to do with the sent
bytes but the exact correlation is hard to grasp in a formula).

We may increment filteredBytes by sizeof(OID) for every relation we
skip here OR by ReoderBufferChangeSize(change) if all the relations
are filtered, but that's too much dependent on how the WAL record is
encoded; and adding that dependency in an output plugin code seems
hard to manage.

If you are suggesting something else, maybe sharing actual code
changes would help.

>
>
> 2)
> + int64 filteredBytes; /* amount of data from reoder buffer that was
>
> reoder --> reorder

Done.

>
> 3)
> One small nitpick:
>
> + /*
> + * If output plugin has chosen to maintain its stats, update the amount of
> + * data sent downstream.
> + */
> + if (ctx->stats)
> + ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) +
> sizeof(TransactionId);
>
> The way sentBytes is updated here feels a bit unnatural; we’re adding
> the lengths for values[2], then [0], and then [1]. Would it be cleaner
> to introduce a len[3] array similar to the existing values[3] and
> nulls[3] arrays? We could initialize len[i] alongside values[i], and
> later just sum up all three elements when updating
> ctx->stats->sentBytes. It would be easier to understand as well.

Instead of an array of length 3, we could keep a counter sentBytes to
accumulate all lengths. It will be assigned to ctx->stats->sentBytes
at the end if ctx->stats != NULL. But that might appear as if we are
performing additions even if it won't be used ultimately. That's not
true, since this plugin will always maintain stats. Changed that way.

-- 
Best Wishes,
Ashutosh Bapat
From 287217cf4aefeee0461bb77aa3377804752bc6e0 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <[email protected]>
Date: Tue, 28 Oct 2025 12:42:04 +0530
Subject: [PATCH 2/2] Address Shveta's comments

---
 src/backend/replication/logical/logicalfuncs.c | 6 +++++-
 src/include/replication/output_plugin.h        | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d2ab41de438..55e02e7ee21 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -65,6 +65,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	Datum		values[3];
 	bool		nulls[3];
 	DecodingOutputState *p;
+	int64		sentBytes = 0;
 
 	/* SQL Datums can only be of a limited length... */
 	if (ctx->out->len > MaxAllocSize - VARHDRSZ)
@@ -74,7 +75,9 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 
 	memset(nulls, 0, sizeof(nulls));
 	values[0] = LSNGetDatum(lsn);
+	sentBytes += sizeof(XLogRecPtr);
 	values[1] = TransactionIdGetDatum(xid);
+	sentBytes += sizeof(TransactionId);
 
 	/*
 	 * Assert ctx->out is in database encoding when we're writing textual
@@ -87,6 +90,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 
 	/* ick, but cstring_to_text_with_len works for bytea perfectly fine */
 	values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
+	sentBytes += ctx->out->len;
 
 	tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
 
@@ -95,7 +99,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	 * data sent downstream.
 	 */
 	if (ctx->stats)
-		ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId);
+		ctx->stats->sentBytes += sentBytes;
 
 	p->returned_rows++;
 }
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 02018f0593c..4cc939e6c98 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -38,7 +38,7 @@ typedef struct OutputPluginStats
 	int64		sentTxns;		/* number of transactions decoded and sent
 								 * downstream */
 	int64		sentBytes;		/* amount of data decoded and sent downstream */
-	int64		filteredBytes;	/* amount of data from reoder buffer that was
+	int64		filteredBytes;	/* amount of data from reorder buffer that was
 								 * filtered out by the output plugin */
 } OutputPluginStats;
 
-- 
2.34.1

From e353db4c93be08371c6a949d86c33c5bad41bb78 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <[email protected]>
Date: Fri, 27 Jun 2025 09:16:23 +0530
Subject: [PATCH 1/2] Report output plugin statistics in
 pg_stat_replication_slots

As of now pg_stat_replication_slots reports statistics about the reorder
buffer, but it does not report output plugin statistics like the amount of data
filtered by the output plugin, amount of data sent downstream or the
number of transactions sent downstream. This statistics is useful when
investigating issues related to a slow downstream.

This commit adds following fields to pg_stat_replication_slots
- plugin_filtered_bytes is the amount of changes filtered out by the
  output plugin
- plugin_sent_txns is the amount of transactions sent downstream by the
  output plugin
- plugin_sent_bytes is the amount of data sent downstream by the output
  plugin.

The prefix "plugin_" indicates that these counters are related to and
maintained by the output plugin. An output plugin may choose not to
initialize LogicalDecodingContext::stats, which holds these counters, in
which case the above columns will be reported as NULL.

When the stats are disabled after being enabled for a while, the plugin
stats are reset to 0, rather than carrying over the stale stats from the
time when the plugin was supporting the stats. This does not matter if
the plugin continues not to support statistics forever. But in case it
was supporting the stats once, discontinued doing so at some point in
time and then starts supporting the stats later, accumulating the new
stats based on the earlier accumulated stats could be misleading.

Filtered bytes are reported next to total_bytes to keep these two
closely related fields together.

Additionally report name of the output plugin in the view for an easy
reference.

total_bytes and total_txns are the only fields remaining unqualified -
they do not convey what those bytes and txns are. Hence rename them
total_wal_bytes and total_wal_txns respectively to indicate that those
counts come from WAL stream.

Author: Ashutosh Bapat <[email protected]>
Reviewed-by: Shveta Malik <[email protected]>
Reviewed-by: Bertrand Drouvot <[email protected]>
Reviewed-by: Ashutosh Sharma <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Discussion: https://www.postgresql.org/message-id/CAExHW5s6KntzUyUoMbKR5dgwRmdV2Ay_2+AnTgYGAzo=qv6...@mail.gmail.com
---
 contrib/test_decoding/expected/stats.out      | 77 ++++++++++---------
 contrib/test_decoding/sql/stats.sql           | 16 ++--
 contrib/test_decoding/t/001_repl_stats.pl     | 22 ++++--
 contrib/test_decoding/test_decoding.c         |  2 +
 doc/src/sgml/logicaldecoding.sgml             | 36 +++++++++
 doc/src/sgml/monitoring.sgml                  | 70 +++++++++++++++--
 src/backend/catalog/system_views.sql          |  8 +-
 src/backend/replication/logical/logical.c     | 28 ++++++-
 .../replication/logical/logicalfuncs.c        |  8 ++
 .../replication/logical/reorderbuffer.c       |  3 +-
 src/backend/replication/pgoutput/pgoutput.c   | 21 +++++
 src/backend/replication/walsender.c           |  7 ++
 src/backend/utils/activity/pgstat_replslot.c  | 19 ++++-
 src/backend/utils/adt/pgstatfuncs.c           | 34 ++++++--
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/pgstat.h                          |  8 +-
 src/include/replication/logical.h             |  1 +
 src/include/replication/output_plugin.h       | 13 ++++
 src/include/replication/reorderbuffer.h       |  1 +
 src/test/recovery/t/006_logical_decoding.pl   | 12 +--
 .../t/035_standby_logical_decoding.pl         |  4 +-
 src/test/regress/expected/rules.out           | 10 ++-
 src/test/subscription/t/001_rep_changes.pl    | 11 +++
 src/test/subscription/t/010_truncate.pl       | 20 +++++
 src/test/subscription/t/028_row_filter.pl     | 11 +++
 src/tools/pgindent/typedefs.list              |  1 +
 26 files changed, 360 insertions(+), 89 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index 28da9123cc8..0e5c5fa5b18 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush();
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | t          | t           | t
- regression_slot_stats2 | t          | t           | t          | t           | t
- regression_slot_stats3 | t          | t           | t          | t           | t
+-- total_wal_txns may vary based on the background activity but plugin_sent_txns
+-- should always be 1 since the background transactions are always skipped.
+-- Filtered bytes would be set only when there's a change that was passed to the
+-- plugin but was filtered out. Depending upon the background transactions,
+-- filtered bytes may or may not be zero.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count 
+------------------------+------------+-------------+----------------+-----------------+------------------+------------+----------------+--------------------
+ regression_slot_stats1 | t          | t           | t              | t               |                1 | t          | t              | t
+ regression_slot_stats2 | t          | t           | t              | t               |                1 | t          | t              | t
+ regression_slot_stats3 | t          | t           | t              | t               |                1 | t          | t              | t
 (3 rows)
 
 RESET logical_decoding_work_mem;
@@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | f          | f           | t
- regression_slot_stats2 | t          | t           | t          | t           | t
- regression_slot_stats3 | t          | t           | t          | t           | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count 
+------------------------+------------+-------------+----------------+-----------------+------------------+------------+----------------+--------------------
+ regression_slot_stats1 | t          | t           | f              | f               |                  |            |                | t
+ regression_slot_stats2 | t          | t           | t              | t               |                1 | t          | t              | t
+ regression_slot_stats3 | t          | t           | t              | t               |                1 | t          | t              | t
 (3 rows)
 
 -- reset stats for all slots
@@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | f          | f           | t
- regression_slot_stats2 | t          | t           | f          | f           | t
- regression_slot_stats3 | t          | t           | f          | f           | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes | mem_exceeded_count 
+------------------------+------------+-------------+----------------+-----------------+------------------+-------------------+-----------------------+--------------------
+ regression_slot_stats1 | t          | t           | f              | f               |                  |                   |                       | t
+ regression_slot_stats2 | t          | t           | f              | f               |                  |                   |                       | t
+ regression_slot_stats3 | t          | t           | f              | f               |                  |                   |                       | t
 (3 rows)
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+-----------------------+------------------+-------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |              0 |               0 |                       |                  |                   | 
 (1 row)
 
 SELECT pg_stat_reset_replication_slot('do-not-exist');
 ERROR:  replication slot "do-not-exist" does not exist
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+-----------------------+------------------+-------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |              0 |               0 |                       |                  |                   | 
 (1 row)
 
 -- spilling the xact
@@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count,
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
 BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
-       slot_name        
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+       slot_name        |    plugin     
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
 (3 rows)
 
-SELECT slot_name FROM pg_stat_replication_slots;
-       slot_name        
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+       slot_name        |    plugin     
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
 (3 rows)
 
 COMMIT;
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 6661dbcb85c..d6bf3cde8b1 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -15,16 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+
+-- total_wal_txns may vary based on the background activity but plugin_sent_txns
+-- should always be 1 since the background transactions are always skipped.
+-- Filtered bytes would be set only when there's a change that was passed to the
+-- plugin but was filtered out. Depending upon the background transactions,
+-- filtered bytes may or may not be zero.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 RESET logical_decoding_work_mem;
 
 -- reset stats for one slot, others should be unaffected
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- reset stats for all slots
 SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
@@ -46,8 +52,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count,
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
 BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
-SELECT slot_name FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
 COMMIT;
 
 
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 0de62edb7d8..756fc691ed6 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -23,10 +23,16 @@ sub test_slot_stats
 
 	my ($node, $expected, $msg) = @_;
 
+	# If there are background transactions which are filtered out by the output
+	# plugin, plugin_filtered_bytes may be greater than 0. But it's not
+	# guaranteed that such transactions would be present.
 	my $result = $node->safe_psql(
 		'postgres', qq[
-		SELECT slot_name, total_txns > 0 AS total_txn,
-			   total_bytes > 0 AS total_bytes
+		SELECT slot_name, total_wal_txns > 0 AS total_txn,
+			   total_wal_bytes > 0 AS total_bytes,
+			   plugin_sent_txns > 0 AS sent_txn,
+			   plugin_sent_bytes > 0 AS sent_bytes,
+			   plugin_filtered_bytes >= 0 AS filtered_bytes
 			   FROM pg_stat_replication_slots
 			   ORDER BY slot_name]);
 	is($result, $expected, $msg);
@@ -65,7 +71,7 @@ $node->poll_query_until(
 	'postgres', qq[
 	SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
 	WHERE slot_name ~ 'regression_slot'
-	AND total_txns > 0 AND total_bytes > 0;
+	AND total_wal_txns > 0 AND total_wal_bytes > 0;
 ]) or die "Timed out while waiting for statistics to be updated";
 
 # Test to drop one of the replication slot and verify replication statistics data is
@@ -80,9 +86,9 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t
-regression_slot3|t|t),
+	qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t
+regression_slot3|t|t|t|t|t),
 	'check replication statistics are updated');
 
 # Test to remove one of the replication slots and adjust
@@ -104,8 +110,8 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t),
+	qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t),
 	'check replication statistics after removing the slot file');
 
 # cleanup
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 36e77c69e1c..d06f6c3f92b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -173,6 +173,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->only_local = false;
 
 	ctx->output_plugin_private = data;
+	ctx->stats = palloc0(sizeof(OutputPluginStats));
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
 	opt->receive_rewrites = false;
@@ -310,6 +311,7 @@ static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
 	OutputPluginPrepareWrite(ctx, last_write);
+	ctx->stats->sentTxns++;
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
 	else
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index b803a819cf1..0bf9ffbfd28 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -938,6 +938,42 @@ typedef struct OutputPluginOptions
       needs to have a state, it can
       use <literal>ctx-&gt;output_plugin_private</literal> to store it.
      </para>
+
+     <para>
+      The startup callback may initialize <literal>ctx-&gt;stats</literal>,
+      typically as follows, if it chooses to maintain and report statistics
+      about its activity in <structname>pg_stat_replication_slots</structname>.
+<programlisting>
+ctx->stats = palloc0(sizeof(OutputPluginStats));
+</programlisting>
+      where <literal>OutputPluginStats</literal> is defined as follows:
+<programlisting>
+typedef struct OutputPluginStats
+{
+      int64   sentTxns;
+      int64   sentBytes;
+      int64   filteredBytes;
+} OutputPluginStats;
+</programlisting>
+      <literal>sentTxns</literal> is the number of transactions sent downstream
+      by the output plugin. <literal>sentBytes</literal> is the amount of data,
+      in bytes, sent downstream by the output plugin.
+      <literal>filteredBytes</literal> is the size of changes, in bytes, that
+      are filtered out by the output plugin.
+      <function>OutputPluginWrite</function> will update
+      <literal>sentBytes</literal> if <literal>ctx-&gt;stats</literal> is
+      initialized by the output plugin. Function
+      <literal>ReorderBufferChangeSize</literal> may be used to find the size of
+      filtered <literal>ReorderBufferChange</literal>.
+     </para>
+
+     <note>
+      <para>
+       Once a plugin starts reporting and maintaining these statistics, it is
+       not expected that they will discontinue doing so. If they do, the result
+       may be misleading because of the cumulative nature of these statistics.
+      </para>
+     </note>
     </sect3>
 
     <sect3 id="logicaldecoding-output-plugin-shutdown">
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index d5f0fb7ba7c..1ccf781f45e 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1549,6 +1549,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin</structfield> <type>text</type>
+       </para>
+       <para>
+        The base name of the shared object containing the output plugin this
+        logical slot is using. This column is same as the one in
+        <structname>pg_replication_slots</structname>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>spill_txns</structfield> <type>bigint</type>
@@ -1637,19 +1648,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-        <structfield>total_txns</structfield> <type>bigint</type>
+        <structfield>total_wal_txns</structfield> <type>bigint</type>
        </para>
        <para>
-        Number of decoded transactions sent to the decoding output plugin for
-        this slot. This counts top-level transactions only, and is not incremented
-        for subtransactions. Note that this includes the transactions that are
-        streamed and/or spilled.
+        Number of decoded transactions from WAL sent to the decoding output
+        plugin for this slot. This counts top-level transactions only, and is
+        not incremented for subtransactions. Note that this includes the
+        transactions that are streamed and/or spilled.
        </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-        <structfield>total_bytes</structfield><type>bigint</type>
+        <structfield>total_wal_bytes</structfield><type>bigint</type>
        </para>
        <para>
         Amount of transaction data decoded for sending transactions to the
@@ -1659,6 +1670,53 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin_filtered_bytes</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Amount of changes, from <structfield>total_wal_bytes</structfield>, filtered
+        out by the output plugin and not sent downstream. Please note that it
+        does not include the changes filtered before a change is sent to
+        the output plugin, e.g. the changes filtered by origin. The counter is
+        maintained by the output plugin mentioned in
+        <structfield>plugin</structfield>. It is NULL when statistics is not
+        initialized or immediately after a reset or when not maintained by the
+        output plugin.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin_sent_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent downstream for this slot. This
+        counts top-level transactions only, and is not incremented for
+        subtransactions. These transactions are subset of transactions sent to
+        the decoding plugin. Hence this count is expected to be less than or
+        equal to <structfield>total_wal_txns</structfield>.  The counter is maintained
+        by the output plugin mentioned in <structfield>plugin</structfield>.  It
+        is NULL when statistics is not initialized or immediately after a reset or
+        when not maintained by the output plugin.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin_sent_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of transaction changes sent downstream for this slot by the
+        output plugin after applying filtering and converting into its output
+        format. The counter is maintained by the output plugin mentioned in
+        <structfield>plugin</structfield>.  It is NULL when statistics is not
+        initialized or immediately after a reset or when not maintained by the
+        output plugin.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 823776c1498..be91e9b01e4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1067,6 +1067,7 @@ CREATE VIEW pg_replication_slots AS
 CREATE VIEW pg_stat_replication_slots AS
     SELECT
             s.slot_name,
+            r.plugin,
             s.spill_txns,
             s.spill_count,
             s.spill_bytes,
@@ -1074,8 +1075,11 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_count,
             s.stream_bytes,
             s.mem_exceeded_count,
-            s.total_txns,
-            s.total_bytes,
+            s.total_wal_txns,
+            s.total_wal_bytes,
+            s.plugin_filtered_bytes,
+            s.plugin_sent_txns,
+            s.plugin_sent_bytes,
             s.stats_reset
     FROM pg_replication_slots as r,
         LATERAL pg_stat_get_replication_slot(slot_name) as s
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 93ed2eb368e..f0810f05153 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1952,6 +1952,7 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
+	OutputPluginStats *stats = ctx->stats;
 	PgStat_StatReplSlotEntry repSlotStat;
 
 	/* Nothing to do if we don't have any replication stats to be sent. */
@@ -1959,7 +1960,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		rb->memExceededCount <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " (%s) %" PRId64 " %" PRId64 " %" PRId64,
 		 rb,
 		 rb->spillTxns,
 		 rb->spillCount,
@@ -1969,7 +1970,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 rb->streamBytes,
 		 rb->memExceededCount,
 		 rb->totalTxns,
-		 rb->totalBytes);
+		 rb->totalBytes,
+		 stats ? "plugin has stats" : "plugin has no stats",
+		 stats ? stats->sentTxns : 0,
+		 stats ? stats->sentBytes : 0,
+		 stats ? stats->filteredBytes : 0);
 
 	repSlotStat.spill_txns = rb->spillTxns;
 	repSlotStat.spill_count = rb->spillCount;
@@ -1978,8 +1983,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.stream_count = rb->streamCount;
 	repSlotStat.stream_bytes = rb->streamBytes;
 	repSlotStat.mem_exceeded_count = rb->memExceededCount;
-	repSlotStat.total_txns = rb->totalTxns;
-	repSlotStat.total_bytes = rb->totalBytes;
+	repSlotStat.total_wal_txns = rb->totalTxns;
+	repSlotStat.total_wal_bytes = rb->totalBytes;
+	if (stats)
+	{
+		repSlotStat.plugin_has_stats = true;
+		repSlotStat.plugin_sent_txns = stats->sentTxns;
+		repSlotStat.plugin_sent_bytes = stats->sentBytes;
+		repSlotStat.plugin_filtered_bytes = stats->filteredBytes;
+	}
+	else
+		repSlotStat.plugin_has_stats = false;
 
 	pgstat_report_replslot(ctx->slot, &repSlotStat);
 
@@ -1992,6 +2006,12 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->memExceededCount = 0;
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
+	if (stats)
+	{
+		stats->sentTxns = 0;
+		stats->sentBytes = 0;
+		stats->filteredBytes = 0;
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 25f890ddeed..d2ab41de438 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -89,6 +89,14 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
 
 	tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
+
+	/*
+	 * If output plugin has chosen to maintain its stats, update the amount of
+	 * data sent downstream.
+	 */
+	if (ctx->stats)
+		ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId);
+
 	p->returned_rows++;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b57aef9916d..d336ef3a51f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
  * memory accounting
  * ---------------------------------------
  */
-static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											ReorderBufferTXN *txn,
@@ -4458,7 +4457,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 /*
  * Size of a change in memory.
  */
-static Size
+Size
 ReorderBufferChangeSize(ReorderBufferChange *change)
 {
 	Size		sz = sizeof(ReorderBufferChange);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 942e1abdb58..4b35f2de6aa 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -473,6 +473,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	MemoryContextRegisterResetCallback(ctx->context, mcallback);
 
 	ctx->output_plugin_private = data;
+	ctx->stats = palloc0(sizeof(OutputPluginStats));
 
 	/* This plugin uses binary protocol. */
 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
@@ -614,6 +615,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
 	txndata->sent_begin_txn = true;
+	ctx->stats->sentTxns++;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
@@ -1492,7 +1494,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *new_slot = NULL;
 
 	if (!is_publishable_relation(relation))
+	{
+		ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1510,15 +1515,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
+			{
+				ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
+			{
+				ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 				return;
+			}
 
 			/*
 			 * This is only possible if deletes are allowed even when replica
@@ -1528,6 +1542,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			if (!change->data.tp.oldtuple)
 			{
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+				ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 				return;
 			}
 			break;
@@ -1583,7 +1598,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * of the row filter for old and new tuple.
 	 */
 	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+	{
+		ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 		goto cleanup;
+	}
 
 	/*
 	 * Send BEGIN if we haven't yet.
@@ -1711,6 +1729,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 								  change->data.truncate.restart_seqs);
 		OutputPluginWrite(ctx, true);
 	}
+	else
+		ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
+
 
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 548eafa7a73..b0a5d4da7a7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1587,6 +1587,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
 
+	/*
+	 * If output plugin maintains statistics, update the amount of data sent
+	 * downstream.
+	 */
+	if (ctx->stats)
+		ctx->stats->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */
+
 	CHECK_FOR_INTERRUPTS();
 
 	/* Try to flush pending output to the client */
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index d210c261ac6..42ca13bd76a 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -88,6 +88,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
 
 	/* Update the replication slot statistics */
 #define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld
+#define REPLSLOT_SET_TO_ZERO(fld) statent->fld = 0
 	REPLSLOT_ACC(spill_txns);
 	REPLSLOT_ACC(spill_count);
 	REPLSLOT_ACC(spill_bytes);
@@ -95,9 +96,23 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
 	REPLSLOT_ACC(stream_count);
 	REPLSLOT_ACC(stream_bytes);
 	REPLSLOT_ACC(mem_exceeded_count);
-	REPLSLOT_ACC(total_txns);
-	REPLSLOT_ACC(total_bytes);
+	REPLSLOT_ACC(total_wal_txns);
+	REPLSLOT_ACC(total_wal_bytes);
+	statent->plugin_has_stats = repSlotStat->plugin_has_stats;
+	if (repSlotStat->plugin_has_stats)
+	{
+		REPLSLOT_ACC(plugin_sent_txns);
+		REPLSLOT_ACC(plugin_sent_bytes);
+		REPLSLOT_ACC(plugin_filtered_bytes);
+	}
+	else
+	{
+		REPLSLOT_SET_TO_ZERO(plugin_sent_txns);
+		REPLSLOT_SET_TO_ZERO(plugin_sent_bytes);
+		REPLSLOT_SET_TO_ZERO(plugin_filtered_bytes);
+	}
 #undef REPLSLOT_ACC
+#undef REPLSLOT_SET_TO_ZERO
 
 	pgstat_unlock_entry(entry_ref);
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 1fe33df2756..be8d30ca9c6 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2121,7 +2121,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 11
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 14
 	text	   *slotname_text = PG_GETARG_TEXT_P(0);
 	NameData	slotname;
 	TupleDesc	tupdesc;
@@ -2148,11 +2148,17 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_wal_txns",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_wal_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "plugin_filtered_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "plugin_sent_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "plugin_sent_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 14, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2176,13 +2182,25 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	values[5] = Int64GetDatum(slotent->stream_count);
 	values[6] = Int64GetDatum(slotent->stream_bytes);
 	values[7] = Int64GetDatum(slotent->mem_exceeded_count);
-	values[8] = Int64GetDatum(slotent->total_txns);
-	values[9] = Int64GetDatum(slotent->total_bytes);
+	values[8] = Int64GetDatum(slotent->total_wal_txns);
+	values[9] = Int64GetDatum(slotent->total_wal_bytes);
+	if (slotent->plugin_has_stats)
+	{
+		values[10] = Int64GetDatum(slotent->plugin_filtered_bytes);
+		values[11] = Int64GetDatum(slotent->plugin_sent_txns);
+		values[12] = Int64GetDatum(slotent->plugin_sent_bytes);
+	}
+	else
+	{
+		nulls[10] = true;
+		nulls[11] = true;
+		nulls[12] = true;
+	}
 
 	if (slotent->stat_reset_timestamp == 0)
-		nulls[10] = true;
+		nulls[13] = true;
 	else
-		values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+		values[13] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index eecb43ec6f0..11404660a56 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5691,9 +5691,9 @@
 { oid => '6169', descr => 'statistics: information about replication slot',
   proname => 'pg_stat_get_replication_slot', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_wal_txns,total_wal_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 
 { oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index bc8077cbae6..ae11f39dd3b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -396,8 +396,12 @@ typedef struct PgStat_StatReplSlotEntry
 	PgStat_Counter stream_count;
 	PgStat_Counter stream_bytes;
 	PgStat_Counter mem_exceeded_count;
-	PgStat_Counter total_txns;
-	PgStat_Counter total_bytes;
+	PgStat_Counter total_wal_txns;
+	PgStat_Counter total_wal_bytes;
+	bool		plugin_has_stats;
+	PgStat_Counter plugin_sent_txns;
+	PgStat_Counter plugin_sent_bytes;
+	PgStat_Counter plugin_filtered_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 2e562bee5a9..010c59f783d 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -52,6 +52,7 @@ typedef struct LogicalDecodingContext
 
 	OutputPluginCallbacks callbacks;
 	OutputPluginOptions options;
+	OutputPluginStats *stats;
 
 	/*
 	 * User specified options
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71887..02018f0593c 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -29,6 +29,19 @@ typedef struct OutputPluginOptions
 	bool		receive_rewrites;
 } OutputPluginOptions;
 
+/*
+ * Statistics about the transactions decoded and sent downstream by the output
+ * plugin.
+ */
+typedef struct OutputPluginStats
+{
+	int64		sentTxns;		/* number of transactions decoded and sent
+								 * downstream */
+	int64		sentBytes;		/* amount of data decoded and sent downstream */
+	int64		filteredBytes;	/* amount of data from reoder buffer that was
+								 * filtered out by the output plugin */
+} OutputPluginStats;
+
 /*
  * Type of the shared library symbol _PG_output_plugin_init that is looked up
  * when loading an output plugin shared library.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3cbe106a3c7..382eba66a76 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -718,6 +718,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids);
 extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid,
 									 XLogRecPtr lsn, ReorderBufferChange *change,
 									 bool toast_insert);
+extern Size ReorderBufferChangeSize(ReorderBufferChange *change);
 extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Snapshot snap, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index f6c0a5bf649..be564b53bc6 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -215,10 +215,10 @@ my $stats_test_slot2 = 'logical_slot';
 # Stats exist for stats test slot 1
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT total_wal_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
+	qq(t|t|t),
+	qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
 );
 
 # Do reset of stats for stats test slot 1
@@ -236,10 +236,10 @@ $node_primary->safe_psql('postgres',
 
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT stats_reset > '$reset1'::timestamptz, total_wal_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.)
+	qq(t|t|t),
+	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_wal_bytes and plugin_sent_bytes were set to 0 and NULL respectively.)
 );
 
 # Check that test slot 2 has NULL in reset timestamp
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index ebe2fae1789..5f4df30d65a 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -577,7 +577,7 @@ $node_primary->safe_psql('testdb',
 	qq[INSERT INTO decoding_test(x,y) SELECT 100,'100';]);
 
 $node_standby->poll_query_until('testdb',
-	qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
+	qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
 ) or die "replication slot stats of vacuum_full_activeslot not updated";
 
 # This should trigger the conflict
@@ -605,7 +605,7 @@ ok( $stderr =~
 # Ensure that replication slot stats are not removed after invalidation.
 is( $node_standby->safe_psql(
 		'testdb',
-		qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
+		qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
 	),
 	't',
 	'replication slot stats not removed after invalidation');
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 16753b2e4c0..d77059ae186 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2142,6 +2142,7 @@ pg_stat_replication| SELECT s.pid,
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
+    r.plugin,
     s.spill_txns,
     s.spill_count,
     s.spill_bytes,
@@ -2149,11 +2150,14 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_count,
     s.stream_bytes,
     s.mem_exceeded_count,
-    s.total_txns,
-    s.total_bytes,
+    s.total_wal_txns,
+    s.total_wal_bytes,
+    s.plugin_filtered_bytes,
+    s.plugin_sent_txns,
+    s.plugin_sent_bytes,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_wal_txns, total_wal_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT name,
     blks_zeroed,
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 430c1246d14..7f37b6fe6c6 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -124,6 +124,9 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
 is($result, qq(1002), 'check initial data was copied to subscriber');
 
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
@@ -157,6 +160,14 @@ $node_publisher->safe_psql('postgres',
 
 $node_publisher->wait_for_catchup('tap_sub');
 
+# Verify that plugin_filtered_bytes increases due to filtered update and delete
+# operations on tab_ins.  We cannot test the exact value since it may include
+# changes from other concurrent transactions.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+	'plugin_filtered_bytes increased after DML filtering');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_ins");
 is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
index 3d16c2a800d..c41ad317221 100644
--- a/src/test/subscription/t/010_truncate.pl
+++ b/src/test/subscription/t/010_truncate.pl
@@ -69,6 +69,9 @@ $node_subscriber->safe_psql('postgres',
 # Wait for initial sync of all subscriptions
 $node_subscriber->wait_for_subscription_sync;
 
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+
 # insert data to truncate
 
 $node_subscriber->safe_psql('postgres',
@@ -98,6 +101,16 @@ $node_publisher->wait_for_catchup('sub1');
 $result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')");
 is($result, qq(101), 'truncate restarted identities');
 
+# All the DMLs above happen on tables that are subscribed to by sub1 and not
+# sub2. plugin_filtered_bytes should get incremented for replication slot
+# corresponding to the subscription sub2. We can not test the exact value of
+# plugin_filtered_bytes because the counter is affected by background activity.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+	'plugin_filtered_bytes increased after publication level filtering');
+$initial_filtered_bytes = $final_filtered_bytes;
+
 # test publication that does not replicate truncate
 
 $node_subscriber->safe_psql('postgres',
@@ -107,6 +120,13 @@ $node_publisher->safe_psql('postgres', "TRUNCATE tab2");
 
 $node_publisher->wait_for_catchup('sub2');
 
+# Truncate changes are filtered out at publication level itself. Make sure that
+# the plugin_filtered_bytes is incremented.
+$final_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+	'plugin_filtered_bytes increased after truncate filtering');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab2");
 is($result, qq(3|1|3), 'truncate not replicated');
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index e2c83670053..039bf5ff5a0 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -579,6 +579,9 @@ is($result, qq(3|6),
 # commands are for testing normal logical replication behavior.
 #
 # test row filter (INSERT, UPDATE, DELETE)
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')");
 $node_publisher->safe_psql('postgres',
@@ -612,6 +615,14 @@ $node_publisher->safe_psql('postgres',
 
 $node_publisher->wait_for_catchup($appname);
 
+# The changes which do not pass the row filter will be filtered. Make sure that
+# the plugin_filtered_bytes reflects that. We can not test the exact value of
+# plugin_filtered_bytes since it is affected by background activity.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+	'plugin_filtered_bytes increased after row filtering');
+
 # Check expected replicated rows for tab_rowfilter_2
 # tap_pub_1 filter is: (c % 2 = 0)
 # tap_pub_2 filter is: (c % 3 = 0)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bb4e1b37005..66678b11066 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1832,6 +1832,7 @@ OuterJoinClauseInfo
 OutputPluginCallbacks
 OutputPluginOptions
 OutputPluginOutputType
+OutputPluginStats
 OverridingKind
 PACE_HEADER
 PACL

base-commit: 3e8e05596a020f043f1efd6406e4511ea85170bd
-- 
2.34.1

Reply via email to