Hi All,

On Tue, Nov 18, 2025 at 4:14 PM Amit Kapila <[email protected]> wrote:
>
> On Tue, Nov 18, 2025 at 4:05 PM Ashutosh Bapat
> <[email protected]> wrote:
> >
> > On Tue, Nov 18, 2025 at 3:24 PM Amit Kapila <[email protected]> wrote:
> > >
> > > On Tue, Nov 4, 2025 at 4:29 PM Ashutosh Bapat
> > > <[email protected]> wrote:
> > > >
> > > > a. sentTxns and filteredBytes need to be modified in the output plugin
> > > > code. The behaviour there is inherently output plugin specific, and
> > > > requires output plugin specific implementation.
> > > >
> > >
> > > Is it possible that we allow change callback (LogicalDecodeChangeCB)
> > > to return a boolean such that if the change is decoded and sent, it
> > > returns true, otherwise, false? If so, the caller could deduce from it
> > > the filtered bytes, and if none of the change calls returns true, this
> > > means the entire transaction is not sent.
> > >
> > > I think this should address Andres's concern of explicitly tracking
> > > these stats in plugins, what do you think?
> > >
> >
> > I was thinking about a similar thing. But I am skeptical since the
> > calling logic is not straight forward - there's an indirection in
> > between. Second, it means that all the plugins have to adapt to the
> > new callback definition. It is optional in my current approach. Since
> > both of us have thought of this approach, I think it's worth a try.
> >
> > "if none of the change calls returns true, this means the entire
> > transaction is not sent" isn't true. A plugin may still send an empty
> > transaction. I was thinking of making commit/abort/prepare callbacks
> > to return true/false to indicate whether a transaction was sent or not
> > and increment the counter accordingly. The plugin has to take care of
> > not returning true for both prepare and commit or prepare and abort.
> > So may be just commit and abort should be made to return true or
> > false. What do you think?
> >
>
> Sounds reasonable to me.

Sorry for the delayed response. PFA the patch implementing the idea
discussed above. It relies on the output plugin callback to return
correct boolean but maintains the statistics in the core itself.

I have reviewed all the previous comments and applied the ones which
are relevant to the new approach again. Following two are worth noting
here.

In order to address Amit's concern [1] that an inaccuracy in these
counts because of a bug in output plugin code may be blamed on the
core, I have added a note in the documentation of view
pg_stat_replication_slot in order to avoid such a blame and also
directing users to plugin they should investigate.

With the statistics being maintained by the core, Bertrand's concern
about stale statistics [2] are also addressed. Also it does not have
the asymmetry mentioned in point 2 in [3].

Please review.

[1] 
https://www.postgresql.org/message-id/CAA4eK1KzYaq9dcaa20Pv44ewomUPj_PbbeLfEnvzuXYMZtNw0A%40mail.gmail.com
[2] 
https://www.postgresql.org/message-id/[email protected]
[3] 
https://www.postgresql.org/message-id/CAExHW5tfVHABuv1moL_shp7oPrWmg8ha7T8CqwZxiMrKror7iw%40mail.gmail.com

-- 
Best Wishes,
Ashutosh Bapat
From f15d7ca6075f5364713582f7714d166c22b36e88 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <[email protected]>
Date: Fri, 27 Jun 2025 09:16:23 +0530
Subject: [PATCH v20251211] Report output plugin statistics in
 pg_stat_replication_slots

As of now pg_stat_replication_slots reports statistics about the reorder
buffer, but it does not report output statistics like the amount of data
filtered by the output plugin, amount of data sent downstream or the
number of transactions sent downstream. This statistics is useful when
investigating issues related to a slow downstream.

This commit adds following fields to pg_stat_replication_slots
- filtered_bytes is the amount of changes filtered out by the
  output plugin
- sent_txns is the amount of transactions sent downstream by the
  output plugin
- sent_bytes is the amount of data sent downstream by the output
  plugin.

Filtered bytes are reported next to total_bytes to keep these two
closely related fields together.

Though these counts are stored and maintained by the core, they require
the output plugin to return correct values to the relevant callbacks.
This has been added as note in the documentation. In order to aid
debugging descripancies in these counter, name of the output plugin is
added to the view for an easy reference.

total_bytes and total_txns are the only fields remaining unqualified -
they do not convey what those bytes and txns are. Hence rename them
total_wal_bytes and total_wal_txns respectively to indicate that those
counts come from WAL stream.

Author: Ashutosh Bapat <[email protected]>
Reviewed-by: Shveta Malik <[email protected]>
Reviewed-by: Bertrand Drouvot <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Ashutosh Sharma <[email protected]>
Discussion: https://www.postgresql.org/message-id/CAExHW5s6KntzUyUoMbKR5dgwRmdV2Ay_2+AnTgYGAzo=qv6...@mail.gmail.com
---
 contrib/test_decoding/expected/stats.out      | 77 ++++++++-------
 contrib/test_decoding/sql/stats.sql           | 16 +++-
 contrib/test_decoding/t/001_repl_stats.pl     | 22 +++--
 contrib/test_decoding/test_decoding.c         | 55 ++++++-----
 doc/src/sgml/logicaldecoding.sgml             | 96 ++++++++++++++-----
 doc/src/sgml/monitoring.sgml                  | 70 ++++++++++++--
 src/backend/catalog/system_views.sql          |  8 +-
 src/backend/replication/logical/logical.c     | 44 ++++++---
 .../replication/logical/logicalfuncs.c        |  8 ++
 .../replication/logical/reorderbuffer.c       |  6 +-
 src/backend/replication/pgoutput/pgoutput.c   | 65 +++++++++----
 src/backend/replication/walsender.c           |  3 +
 src/backend/utils/activity/pgstat_replslot.c  |  7 +-
 src/backend/utils/adt/pgstatfuncs.c           | 35 ++++---
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/pgstat.h                          |  7 +-
 src/include/replication/output_plugin.h       | 43 +++++----
 src/include/replication/reorderbuffer.h       |  6 ++
 src/test/recovery/t/006_logical_decoding.pl   | 12 +--
 .../t/035_standby_logical_decoding.pl         |  4 +-
 src/test/regress/expected/rules.out           | 10 +-
 src/test/subscription/t/001_rep_changes.pl    | 11 +++
 src/test/subscription/t/010_truncate.pl       | 20 ++++
 src/test/subscription/t/028_row_filter.pl     | 11 +++
 24 files changed, 452 insertions(+), 190 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index a9ead3c41aa..54079b7d83d 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush();
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | t          | t           | t
- regression_slot_stats2 | t          | t           | t          | t           | t
- regression_slot_stats3 | t          | t           | t          | t           | t
+-- total_wal_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped. Usually we
+-- expect filtered_bytes to be 0 since the entire transaction executed by this
+-- test is replicated. But there may be some background transactions, changes
+-- from which are filtered out by the output plugin, so we check for >= 0 here.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes > 0 AS sent_bytes, filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_wal_txns | total_wal_bytes | sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count 
+------------------------+------------+-------------+----------------+-----------------+-----------+------------+----------------+--------------------
+ regression_slot_stats1 | t          | t           | t              | t               |         1 | t          | t              | t
+ regression_slot_stats2 | t          | t           | t              | t               |         1 | t          | t              | t
+ regression_slot_stats3 | t          | t           | t              | t               |         1 | t          | t              | t
 (3 rows)
 
 RESET logical_decoding_work_mem;
@@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | f          | f           | t
- regression_slot_stats2 | t          | t           | t          | t           | t
- regression_slot_stats3 | t          | t           | t          | t           | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes > 0 AS sent_bytes, filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_wal_txns | total_wal_bytes | sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count 
+------------------------+------------+-------------+----------------+-----------------+-----------+------------+----------------+--------------------
+ regression_slot_stats1 | t          | t           | f              | f               |         0 | f          | t              | t
+ regression_slot_stats2 | t          | t           | t              | t               |         1 | t          | t              | t
+ regression_slot_stats3 | t          | t           | t              | t               |         1 | t          | t              | t
 (3 rows)
 
 -- reset stats for all slots
@@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | f          | f           | t
- regression_slot_stats2 | t          | t           | f          | f           | t
- regression_slot_stats3 | t          | t           | f          | f           | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes, filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_wal_txns | total_wal_bytes | sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count 
+------------------------+------------+-------------+----------------+-----------------+-----------+------------+----------------+--------------------
+ regression_slot_stats1 | t          | t           | f              | f               |         0 |          0 |              0 | t
+ regression_slot_stats2 | t          | t           | f              | f               |         0 |          0 |              0 | t
+ regression_slot_stats3 | t          | t           | f              | f               |         0 |          0 |              0 | t
 (3 rows)
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+--------------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 |                   0 |                    | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | filtered_bytes | sent_txns | sent_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+----------------+-----------+------------+---------------------+--------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |              0 |               0 |              0 |         0 |          0 |                   0 |                    | 
 (1 row)
 
 SELECT pg_stat_reset_replication_slot('do-not-exist');
 ERROR:  replication slot "do-not-exist" does not exist
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+--------------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 |                   0 |                    | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | filtered_bytes | sent_txns | sent_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+----------------+-----------+------------+---------------------+--------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |              0 |               0 |              0 |         0 |          0 |                   0 |                    | 
 (1 row)
 
 -- spilling the xact
@@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count,
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
 BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
-       slot_name        
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+       slot_name        |    plugin     
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
 (3 rows)
 
-SELECT slot_name FROM pg_stat_replication_slots;
-       slot_name        
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+       slot_name        |    plugin     
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
 (3 rows)
 
 COMMIT;
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 6661dbcb85c..17e7c0e8f88 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -15,16 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+
+-- total_wal_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped. Usually we
+-- expect filtered_bytes to be 0 since the entire transaction executed by this
+-- test is replicated. But there may be some background transactions, changes
+-- from which are filtered out by the output plugin, so we check for >= 0 here.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes > 0 AS sent_bytes, filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 RESET logical_decoding_work_mem;
 
 -- reset stats for one slot, others should be unaffected
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes > 0 AS sent_bytes, filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- reset stats for all slots
 SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes, filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
@@ -46,8 +52,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count,
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
 BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
-SELECT slot_name FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
 COMMIT;
 
 
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 0de62edb7d8..89d3ff0a239 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -23,10 +23,16 @@ sub test_slot_stats
 
 	my ($node, $expected, $msg) = @_;
 
+	# If there are background transactions which are filtered out by the output
+	# plugin, filtered_bytes may be greater than 0. But it's not guaranteed that
+	# such transactions would be present.
 	my $result = $node->safe_psql(
 		'postgres', qq[
-		SELECT slot_name, total_txns > 0 AS total_txn,
-			   total_bytes > 0 AS total_bytes
+		SELECT slot_name, total_wal_txns > 0 AS total_txn,
+			   total_wal_bytes > 0 AS total_bytes,
+			   sent_txns > 0 AS sent_txn,
+			   sent_bytes > 0 AS sent_bytes,
+			   filtered_bytes >= 0 AS filtered_bytes
 			   FROM pg_stat_replication_slots
 			   ORDER BY slot_name]);
 	is($result, $expected, $msg);
@@ -65,7 +71,7 @@ $node->poll_query_until(
 	'postgres', qq[
 	SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
 	WHERE slot_name ~ 'regression_slot'
-	AND total_txns > 0 AND total_bytes > 0;
+	AND total_wal_txns > 0 AND total_wal_bytes > 0;
 ]) or die "Timed out while waiting for statistics to be updated";
 
 # Test to drop one of the replication slot and verify replication statistics data is
@@ -80,9 +86,9 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t
-regression_slot3|t|t),
+	qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t
+regression_slot3|t|t|t|t|t),
 	'check replication statistics are updated');
 
 # Test to remove one of the replication slots and adjust
@@ -104,8 +110,8 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t),
+	qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t),
 	'check replication statistics after removing the slot file');
 
 # cleanup
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 47094f86f5f..69ad9599804 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -60,12 +60,12 @@ static void pg_output_begin(LogicalDecodingContext *ctx,
 							TestDecodingData *data,
 							ReorderBufferTXN *txn,
 							bool last_write);
-static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
+static bool pg_decode_commit_txn(LogicalDecodingContext *ctx,
 								 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
-static void pg_decode_change(LogicalDecodingContext *ctx,
+static bool pg_decode_change(LogicalDecodingContext *ctx,
 							 ReorderBufferTXN *txn, Relation relation,
 							 ReorderBufferChange *change);
-static void pg_decode_truncate(LogicalDecodingContext *ctx,
+static bool pg_decode_truncate(LogicalDecodingContext *ctx,
 							   ReorderBufferTXN *txn,
 							   int nrelations, Relation relations[],
 							   ReorderBufferChange *change);
@@ -80,7 +80,7 @@ static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
-static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+static bool pg_decode_prepare_txn(LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn,
 								  XLogRecPtr prepare_lsn);
 static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
@@ -98,16 +98,16 @@ static void pg_output_stream_start(LogicalDecodingContext *ctx,
 								   bool last_write);
 static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
-static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_abort(LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn,
 								   XLogRecPtr abort_lsn);
-static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_prepare(LogicalDecodingContext *ctx,
 									 ReorderBufferTXN *txn,
 									 XLogRecPtr prepare_lsn);
-static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_commit(LogicalDecodingContext *ctx,
 									ReorderBufferTXN *txn,
 									XLogRecPtr commit_lsn);
-static void pg_decode_stream_change(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_change(LogicalDecodingContext *ctx,
 									ReorderBufferTXN *txn,
 									Relation relation,
 									ReorderBufferChange *change);
@@ -115,7 +115,7 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
 									 ReorderBufferTXN *txn, XLogRecPtr lsn,
 									 bool transactional, const char *prefix,
 									 Size sz, const char *message);
-static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn,
 									  int nrelations, Relation relations[],
 									  ReorderBufferChange *change);
@@ -318,7 +318,7 @@ pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBuff
 }
 
 /* COMMIT callback */
-static void
+static bool
 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
@@ -330,7 +330,7 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	txn->output_plugin_private = NULL;
 
 	if (data->skip_empty_xacts && !xact_wrote_changes)
-		return;
+		return false;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
@@ -343,6 +343,7 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						 timestamptz_to_str(txn->commit_time));
 
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
 /* BEGIN PREPARE callback */
@@ -367,7 +368,7 @@ pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 }
 
 /* PREPARE callback */
-static void
+static bool
 pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					  XLogRecPtr prepare_lsn)
 {
@@ -379,7 +380,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * where the first operation is received for this transaction.
 	 */
 	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
-		return;
+		return false;
 
 	OutputPluginPrepareWrite(ctx, true);
 
@@ -394,6 +395,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						 timestamptz_to_str(txn->prepare_time));
 
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
 /* COMMIT PREPARED callback */
@@ -599,7 +601,7 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 /*
  * callback for individual changed tuples
  */
-static void
+static bool
 pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 Relation relation, ReorderBufferChange *change)
 {
@@ -684,9 +686,10 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContextReset(data->context);
 
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
-static void
+static bool
 pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				   int nrelations, Relation relations[], ReorderBufferChange *change)
 {
@@ -739,6 +742,7 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContextReset(data->context);
 
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
 static void
@@ -818,7 +822,7 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
-static void
+static bool
 pg_decode_stream_abort(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn,
 					   XLogRecPtr abort_lsn)
@@ -842,7 +846,7 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 	}
 
 	if (data->skip_empty_xacts && !xact_wrote_changes)
-		return;
+		return false;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
@@ -850,9 +854,10 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 	else
 		appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
-static void
+static bool
 pg_decode_stream_prepare(LogicalDecodingContext *ctx,
 						 ReorderBufferTXN *txn,
 						 XLogRecPtr prepare_lsn)
@@ -861,7 +866,7 @@ pg_decode_stream_prepare(LogicalDecodingContext *ctx,
 	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
 	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
-		return;
+		return false;
 
 	OutputPluginPrepareWrite(ctx, true);
 
@@ -877,9 +882,10 @@ pg_decode_stream_prepare(LogicalDecodingContext *ctx,
 						 timestamptz_to_str(txn->prepare_time));
 
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
-static void
+static bool
 pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						ReorderBufferTXN *txn,
 						XLogRecPtr commit_lsn)
@@ -892,7 +898,7 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 	txn->output_plugin_private = NULL;
 
 	if (data->skip_empty_xacts && !xact_wrote_changes)
-		return;
+		return false;
 
 	OutputPluginPrepareWrite(ctx, true);
 
@@ -906,6 +912,7 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						 timestamptz_to_str(txn->commit_time));
 
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
 /*
@@ -913,7 +920,7 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
  * at a later point in time.  We don't want users to see the changes until the
  * transaction is committed.
  */
-static void
+static bool
 pg_decode_stream_change(LogicalDecodingContext *ctx,
 						ReorderBufferTXN *txn,
 						Relation relation,
@@ -935,6 +942,7 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 	else
 		appendStringInfoString(ctx->out, "streaming change for transaction");
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
 /*
@@ -981,7 +989,7 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
  * In streaming mode, we don't display the detailed information of Truncate.
  * See pg_decode_stream_change.
  */
-static void
+static bool
 pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						  int nrelations, Relation relations[],
 						  ReorderBufferChange *change)
@@ -1001,4 +1009,5 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	else
 		appendStringInfoString(ctx->out, "streaming truncate for transaction");
 	OutputPluginWrite(ctx, true);
+	return true;
 }
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index d5a5e22fe2c..886c7d940df 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -981,10 +981,15 @@ typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
       rows will have been called before this, if there have been any modified
       rows.
 <programlisting>
-typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr commit_lsn);
 </programlisting>
+      If the callback outputs the transaction, it is expected to return true;
+      otherwise false. The return value is used to update the
+      <literal>sent_txns</literal> counter reported in <link
+      linkend="monitoring-pg-stat-replication-slots-view">
+      <structname>pg_stat_replication_slots</structname></link> view.
      </para>
     </sect3>
 
@@ -1005,7 +1010,7 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
       this very same transaction. In that case, the logical decoding of this
       aborted transaction is stopped gracefully.
 <programlisting>
-typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        Relation relation,
                                        ReorderBufferChange *change);
@@ -1015,8 +1020,12 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
       and <function>commit_cb</function> callbacks, but additionally the
       relation descriptor <parameter>relation</parameter> points to the
       relation the row belongs to and a struct
-      <parameter>change</parameter> describing the row modification are passed
-      in.
+      <parameter>change</parameter> describing the row modification are passed in.
+      If the output plugin decoded and output the change, it is expected
+      to return true; otherwise false. This return value is used to update the
+      <structfield>filtered_bytes</structfield> counter reported in
+      <link linkend="monitoring-pg-stat-replication-slots-view">
+      <structname>pg_stat_replication_slots</structname></link> view.
      </para>
 
      <note>
@@ -1036,18 +1045,18 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
       The optional <function>truncate_cb</function> callback is called for a
       <command>TRUNCATE</command> command.
 <programlisting>
-typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                          ReorderBufferTXN *txn,
                                          int nrelations,
                                          Relation relations[],
                                          ReorderBufferChange *change);
 </programlisting>
-      The parameters are analogous to the <function>change_cb</function>
-      callback.  However, because <command>TRUNCATE</command> actions on
-      tables connected by foreign keys need to be executed together, this
-      callback receives an array of relations instead of just a single one.
-      See the description of the <xref linkend="sql-truncate"/> statement for
-      details.
+      The parameters and the expected return value are analogous to the
+      <function>change_cb</function> callback.  However, because
+      <command>TRUNCATE</command> actions on tables connected by foreign keys
+      need to be executed together, this callback receives an array of relations
+      instead of just a single one.  See the description of the <xref
+      linkend="sql-truncate"/> statement for details.
      </para>
     </sect3>
 
@@ -1180,8 +1189,18 @@ typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
       rows will have been called before this, if there have been any modified
       rows. The <parameter>gid</parameter> field, which is part of the
       <parameter>txn</parameter> parameter, can be used in this callback.
+      If the callback outputs the prepared transaction, it is expected to return
+      true; otherwise false. The return value is used to update the
+      <structfield>sent_txns</structfield> counter reported in
+      <link linkend="monitoring-pg-stat-replication-slots-view">
+      <structname>pg_stat_replication_slots</structname></link> view. Please
+      note that the return value of this callback suffices to determine
+      whether a prepared transaction was output or not; callbacks
+      <function>commit_prepared_cb</function> and
+      <function>rollback_prepared_cb</function> do not need to return this
+      status again.
 <programlisting>
-typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         XLogRecPtr prepare_lsn);
 </programlisting>
@@ -1255,9 +1274,14 @@ typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
      <title>Stream Abort Callback</title>
      <para>
       The required <function>stream_abort_cb</function> callback is called to
-      abort a previously streamed transaction.
+      abort a previously streamed transaction. If the output plugin has output
+      the streamed transaction, the callback is expected to return true;
+      otherwise false. The return value is used to update the
+      <structfield>sent_txns</structfield> counter reported in
+      <link linkend="monitoring-pg-stat-replication-slots-view">
+      <structname>pg_stat_replication_slots</structname></link> view.
 <programlisting>
-typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr abort_lsn);
 </programlisting>
@@ -1270,9 +1294,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
       The <function>stream_prepare_cb</function> callback is called to prepare
       a previously streamed transaction as part of a two-phase commit. This
       callback is required when the output plugin supports both the streaming
-      of large in-progress transactions and two-phase commits.
+      of large in-progress transactions and two-phase commits. If the output
+      plugin has output the streamed transaction, the callback is expected to
+      return true; otherwise false. The return value is used to update the
+      <structfield>sent_txns</structfield> counter reported in
+      <link linkend="monitoring-pg-stat-replication-slots-view">
+      <structname>pg_stat_replication_slots</structname></link> view. Please
+      note that only the return value of this callback suffices to determine
+      whether a prepared transaction was output or not; callbacks
+      <function>commit_prepared_cb</function> and
+      <function>rollback_prepared_cb</function> do not need to return this
+      status again.
       <programlisting>
-typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr prepare_lsn);
 </programlisting>
@@ -1283,9 +1317,14 @@ typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx
      <title>Stream Commit Callback</title>
      <para>
       The required <function>stream_commit_cb</function> callback is called to
-      commit a previously streamed transaction.
+      commit a previously streamed transaction. If the output plugin
+      has output the streamed transaction, the callback is expected to return
+      true; otherwise false. The return value is used to update the
+      <structfield>sent_txns</structfield> counter reported in
+      <link linkend="monitoring-pg-stat-replication-slots-view">
+      <structname>pg_stat_replication_slots</structname></link> view.
 <programlisting>
-typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr commit_lsn);
 </programlisting>
@@ -1298,10 +1337,15 @@ typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
       The required <function>stream_change_cb</function> callback is called
       when sending a change in a block of streamed changes (demarcated by
       <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+      If the output plugin decoded and output the change, it is expected to
+      return true. Otherwise it is expected to return false. This return value
+      is used to update the <structfield>filtered_bytes</structfield> counter
+      reported in <link linkend="monitoring-pg-stat-replication-slots-view">
+      <structname>pg_stat_replication_slots</structname></link> view.
       The actual changes are not displayed as the transaction can abort at a later
       point in time and we don't decode changes for aborted transactions.
 <programlisting>
-typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              Relation relation,
                                              ReorderBufferChange *change);
@@ -1338,18 +1382,18 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
       (demarcated by <function>stream_start_cb</function> and
       <function>stream_stop_cb</function> calls).
 <programlisting>
-typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                                ReorderBufferTXN *txn,
                                                int nrelations,
                                                Relation relations[],
                                                ReorderBufferChange *change);
 </programlisting>
-      The parameters are analogous to the <function>stream_change_cb</function>
-      callback.  However, because <command>TRUNCATE</command> actions on
-      tables connected by foreign keys need to be executed together, this
-      callback receives an array of relations instead of just a single one.
-      See the description of the <xref linkend="sql-truncate"/> statement for
-      details.
+      The parameters and the return value are analogous to the
+      <function>stream_change_cb</function> callback.  However, because
+      <command>TRUNCATE</command> actions on tables connected by foreign keys
+      need to be executed together, this callback receives an array of relations
+      instead of just a single one.  See the description of the <xref
+      linkend="sql-truncate"/> statement for details.
      </para>
     </sect3>
 
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 817fd9f4ca7..d3710e762e4 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1547,6 +1547,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin</structfield> <type>text</type>
+       </para>
+       <para>
+        The base name of the shared object containing the output plugin this
+        logical slot is using. This column is same as the one in
+        <structname>pg_replication_slots</structname>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>spill_txns</structfield> <type>bigint</type>
@@ -1635,19 +1646,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-        <structfield>total_txns</structfield> <type>bigint</type>
+        <structfield>total_wal_txns</structfield> <type>bigint</type>
        </para>
        <para>
-        Number of decoded transactions sent to the decoding output plugin for
-        this slot. This counts top-level transactions only, and is not incremented
-        for subtransactions. Note that this includes the transactions that are
-        streamed and/or spilled.
+        Number of decoded transactions from WAL sent to the decoding output
+        plugin for this slot. This counts top-level transactions only, and is
+        not incremented for subtransactions. Note that this includes the
+        transactions that are streamed and/or spilled.
        </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-        <structfield>total_bytes</structfield><type>bigint</type>
+        <structfield>total_wal_bytes</structfield><type>bigint</type>
        </para>
        <para>
         Amount of transaction data decoded for sending transactions to the
@@ -1657,6 +1668,42 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>filtered_bytes</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Amount of changes, from <structfield>total_wal_bytes</structfield>, filtered
+        out by the output plugin and not sent downstream. Please note that it
+        does not include the changes filtered before a change is sent to
+        the output plugin, e.g. the changes filtered by origin.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>sent_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent downstream for this slot. This
+        counts top-level transactions only, and is not incremented for
+        subtransactions. These transactions are subset of transactions sent to
+        the decoding plugin. Hence this count is expected to be less than or
+        equal to <structfield>total_wal_txns</structfield>.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>sent_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of transaction changes, in the output format, sent downstream for
+        this slot by the output plugin.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>slotsync_skip_count</structfield><type>bigint</type>
@@ -1693,6 +1740,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
    </tgroup>
   </table>
 
+  <note>
+   <para>
+    The accuracy of columns <structfield>filtered_bytes</structfield>, and
+    <structfield>sent_txns</structfield> depends upon the accuracy of return values
+    from respective callbacks associated with those counts as mentioned in <xref
+    linkend="logicaldecoding-output-plugin-callbacks"/>. A descripancy in those
+    counts may be result of incorrect implementation of those callbacks in the
+    output plugin given by column <structfield>plugin</structfield>.
+   </para>
+  </note>
+
  </sect2>
 
  <sect2 id="monitoring-pg-stat-wal-receiver-view">
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 0a0f95f6bb9..2b30361d32a 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1068,6 +1068,7 @@ CREATE VIEW pg_replication_slots AS
 CREATE VIEW pg_stat_replication_slots AS
     SELECT
             s.slot_name,
+            r.plugin,
             s.spill_txns,
             s.spill_count,
             s.spill_bytes,
@@ -1075,8 +1076,11 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_count,
             s.stream_bytes,
             s.mem_exceeded_count,
-            s.total_txns,
-            s.total_bytes,
+            s.total_wal_txns,
+            s.total_wal_bytes,
+            s.filtered_bytes,
+            s.sent_txns,
+            s.sent_bytes,
             s.slotsync_skip_count,
             s.slotsync_last_skip,
             s.stats_reset
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1b11ed63dc6..c65399a9c3d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -888,7 +888,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->end_xact = true;
 
 	/* do the actual work: call callback */
-	ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
+	if (ctx->callbacks.commit_cb(ctx, txn, commit_lsn))
+		cache->sentTxns++;
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -984,7 +985,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						"prepare_cb")));
 
 	/* do the actual work: call callback */
-	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+	if (ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn))
+		cache->sentTxns++;
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -1115,7 +1117,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	ctx->end_xact = false;
 
-	ctx->callbacks.change_cb(ctx, txn, relation, change);
+	if (!ctx->callbacks.change_cb(ctx, txn, relation, change))
+		cache->filteredBytes += ReorderBufferChangeSize(change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -1157,7 +1160,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	ctx->end_xact = false;
 
-	ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
+	if (!ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change))
+		cache->filteredBytes += ReorderBufferChangeSize(change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -1396,7 +1400,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				 errmsg("logical streaming requires a %s callback",
 						"stream_abort_cb")));
 
-	ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
+	if (ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn))
+		cache->sentTxns++;
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -1441,7 +1446,8 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				 errmsg("logical streaming at prepare time requires a %s callback",
 						"stream_prepare_cb")));
 
-	ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
+	if (ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn))
+		cache->sentTxns++;
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -1482,7 +1488,8 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				 errmsg("logical streaming requires a %s callback",
 						"stream_commit_cb")));
 
-	ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
+	if (ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn))
+		cache->sentTxns++;
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -1531,7 +1538,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				 errmsg("logical streaming requires a %s callback",
 						"stream_change_cb")));
 
-	ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
+	if (!ctx->callbacks.stream_change_cb(ctx, txn, relation, change))
+		cache->filteredBytes += ReorderBufferChangeSize(change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -1619,7 +1627,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	ctx->end_xact = false;
 
-	ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
+	if (!ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change))
+		cache->filteredBytes += ReorderBufferChangeSize(change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
@@ -1959,7 +1968,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		rb->memExceededCount <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
 		 rb,
 		 rb->spillTxns,
 		 rb->spillCount,
@@ -1969,7 +1978,10 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 rb->streamBytes,
 		 rb->memExceededCount,
 		 rb->totalTxns,
-		 rb->totalBytes);
+		 rb->totalBytes,
+		 rb->sentTxns,
+		 rb->sentBytes,
+		 rb->filteredBytes);
 
 	repSlotStat.spill_txns = rb->spillTxns;
 	repSlotStat.spill_count = rb->spillCount;
@@ -1978,8 +1990,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.stream_count = rb->streamCount;
 	repSlotStat.stream_bytes = rb->streamBytes;
 	repSlotStat.mem_exceeded_count = rb->memExceededCount;
-	repSlotStat.total_txns = rb->totalTxns;
-	repSlotStat.total_bytes = rb->totalBytes;
+	repSlotStat.total_wal_txns = rb->totalTxns;
+	repSlotStat.total_wal_bytes = rb->totalBytes;
+	repSlotStat.sent_txns = rb->sentTxns;
+	repSlotStat.sent_bytes = rb->sentBytes;
+	repSlotStat.filtered_bytes = rb->filteredBytes;
 
 	pgstat_report_replslot(ctx->slot, &repSlotStat);
 
@@ -1992,6 +2007,9 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->memExceededCount = 0;
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
+	rb->sentTxns = 0;
+	rb->sentBytes = 0;
+	rb->filteredBytes = 0;
 }
 
 /*
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index cf77ee28dfe..0acbda94941 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -65,6 +65,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	Datum		values[3];
 	bool		nulls[3];
 	DecodingOutputState *p;
+	int64		sentBytes = 0;
 
 	/* SQL Datums can only be of a limited length... */
 	if (ctx->out->len > MaxAllocSize - VARHDRSZ)
@@ -74,7 +75,9 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 
 	memset(nulls, 0, sizeof(nulls));
 	values[0] = LSNGetDatum(lsn);
+	sentBytes += sizeof(XLogRecPtr);
 	values[1] = TransactionIdGetDatum(xid);
+	sentBytes += sizeof(TransactionId);
 
 	/*
 	 * Assert ctx->out is in database encoding when we're writing textual
@@ -87,8 +90,13 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 
 	/* ick, but cstring_to_text_with_len works for bytea perfectly fine */
 	values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
+	sentBytes += ctx->out->len;
 
 	tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
+
+	/* Update the amount of data sent downstream. */
+	ctx->reorder->sentBytes += sentBytes;
+
 	p->returned_rows++;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index f18c6fb52b5..e4e65688235 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
  * memory accounting
  * ---------------------------------------
  */
-static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											ReorderBufferTXN *txn,
@@ -393,6 +392,9 @@ ReorderBufferAllocate(void)
 	buffer->memExceededCount = 0;
 	buffer->totalTxns = 0;
 	buffer->totalBytes = 0;
+	buffer->sentTxns = 0;
+	buffer->sentBytes = 0;
+	buffer->filteredBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -4455,7 +4457,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 /*
  * Size of a change in memory.
  */
-static Size
+Size
 ReorderBufferChangeSize(ReorderBufferChange *change)
 {
 	Size		sz = sizeof(ReorderBufferChange);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 787998abb8a..8377c2ea464 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -46,12 +46,12 @@ static void pgoutput_startup(LogicalDecodingContext *ctx,
 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
 							   ReorderBufferTXN *txn);
-static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
+static bool pgoutput_commit_txn(LogicalDecodingContext *ctx,
 								ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
-static void pgoutput_change(LogicalDecodingContext *ctx,
+static bool pgoutput_change(LogicalDecodingContext *ctx,
 							ReorderBufferTXN *txn, Relation relation,
 							ReorderBufferChange *change);
-static void pgoutput_truncate(LogicalDecodingContext *ctx,
+static bool pgoutput_truncate(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, int nrelations, Relation relations[],
 							  ReorderBufferChange *change);
 static void pgoutput_message(LogicalDecodingContext *ctx,
@@ -62,7 +62,7 @@ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
-static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+static bool pgoutput_prepare_txn(LogicalDecodingContext *ctx,
 								 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
 										 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
@@ -74,13 +74,13 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
 								 ReorderBufferTXN *txn);
-static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+static bool pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn,
 								  XLogRecPtr abort_lsn);
-static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+static bool pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn,
 								   XLogRecPtr commit_lsn);
-static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+static bool pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 
 static bool publications_valid;
@@ -624,7 +624,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 /*
  * COMMIT callback
  */
-static void
+static bool
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
@@ -645,12 +645,13 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (!sent_begin_txn)
 	{
 		elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
-		return;
+		return false;
 	}
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
 /*
@@ -673,7 +674,7 @@ pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 /*
  * PREPARE callback
  */
-static void
+static bool
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
@@ -682,6 +683,7 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
 /*
@@ -1476,7 +1478,7 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
  *
  * This is called both in streaming and non-streaming modes.
  */
-static void
+static bool
 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				Relation relation, ReorderBufferChange *change)
 {
@@ -1490,9 +1492,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
+	bool		result;
 
 	if (!is_publishable_relation(relation))
-		return;
+		return false;
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1510,15 +1513,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
-				return;
+				return false;
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
-				return;
+				return false;
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
-				return;
+				return false;
 
 			/*
 			 * This is only possible if deletes are allowed even when replica
@@ -1528,7 +1531,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			if (!change->data.tp.oldtuple)
 			{
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
-				return;
+				return false;
 			}
 			break;
 		default:
@@ -1583,7 +1586,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * of the row filter for old and new tuple.
 	 */
 	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+	{
+		result = false;
 		goto cleanup;
+	}
+
+	/*
+	 * Even if we filter some columns while sending the message we are not
+	 * filtering the change as a whole. Hence we will return true.
+	 */
+	result = true;
 
 	/*
 	 * Send BEGIN if we haven't yet.
@@ -1646,9 +1658,10 @@ cleanup:
 
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
+	return result;
 }
 
-static void
+static bool
 pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				  int nrelations, Relation relations[], ReorderBufferChange *change)
 {
@@ -1660,6 +1673,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	int			nrelids;
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
+	bool		result = false;
 
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (data->in_streaming)
@@ -1710,10 +1724,18 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 								  change->data.truncate.cascade,
 								  change->data.truncate.restart_seqs);
 		OutputPluginWrite(ctx, true);
+
+		/*
+		 * Even if we filtered out some relations, we still send a TRUNCATE
+		 * message for the remaining relations. Since the change, as a whole,
+		 * is not filtered out we return true.
+		 */
+		result = true;
 	}
 
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
+	return result;
 }
 
 static void
@@ -1885,7 +1907,7 @@ pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
  * Notify downstream to discard the streamed transaction (along with all
  * its subtransactions, if it's a toplevel transaction).
  */
-static void
+static bool
 pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn,
 					  XLogRecPtr abort_lsn)
@@ -1912,13 +1934,14 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 
 	cleanup_rel_sync_cache(toptxn->xid, false);
+	return true;
 }
 
 /*
  * Notify downstream to apply the streamed transaction (along with all
  * its subtransactions).
  */
-static void
+static bool
 pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn,
 					   XLogRecPtr commit_lsn)
@@ -1939,6 +1962,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 
 	cleanup_rel_sync_cache(txn->xid, true);
+	return true;
 }
 
 /*
@@ -1946,7 +1970,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
  *
  * Notify the downstream to prepare the transaction.
  */
-static void
+static bool
 pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 							ReorderBufferTXN *txn,
 							XLogRecPtr prepare_lsn)
@@ -1957,6 +1981,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
+	return true;
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 449632ad1aa..8ff11f7e5c8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1587,6 +1587,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
 
+	/* Update the amount of data sent downstream. */
+	ctx->reorder->sentBytes += ctx->out->len + 1;	/* +1 for the 'd' */
+
 	CHECK_FOR_INTERRUPTS();
 
 	/* Try to flush pending output to the client */
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index d757e00eb54..541c39bd0cc 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -95,8 +95,11 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
 	REPLSLOT_ACC(stream_count);
 	REPLSLOT_ACC(stream_bytes);
 	REPLSLOT_ACC(mem_exceeded_count);
-	REPLSLOT_ACC(total_txns);
-	REPLSLOT_ACC(total_bytes);
+	REPLSLOT_ACC(total_wal_txns);
+	REPLSLOT_ACC(total_wal_bytes);
+	REPLSLOT_ACC(sent_txns);
+	REPLSLOT_ACC(sent_bytes);
+	REPLSLOT_ACC(filtered_bytes);
 #undef REPLSLOT_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index ef6fffe60b9..3752a89553c 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2129,7 +2129,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 13
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 16
 	text	   *slotname_text = PG_GETARG_TEXT_P(0);
 	NameData	slotname;
 	TupleDesc	tupdesc;
@@ -2156,15 +2156,21 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_wal_txns",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_wal_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "filtered_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_last_skip",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "sent_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "sent_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 14, "slotsync_skip_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 15, "slotsync_last_skip",
 					   TIMESTAMPTZOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 16, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2188,19 +2194,22 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	values[5] = Int64GetDatum(slotent->stream_count);
 	values[6] = Int64GetDatum(slotent->stream_bytes);
 	values[7] = Int64GetDatum(slotent->mem_exceeded_count);
-	values[8] = Int64GetDatum(slotent->total_txns);
-	values[9] = Int64GetDatum(slotent->total_bytes);
-	values[10] = Int64GetDatum(slotent->slotsync_skip_count);
+	values[8] = Int64GetDatum(slotent->total_wal_txns);
+	values[9] = Int64GetDatum(slotent->total_wal_bytes);
+	values[10] = Int64GetDatum(slotent->filtered_bytes);
+	values[11] = Int64GetDatum(slotent->sent_txns);
+	values[12] = Int64GetDatum(slotent->sent_bytes);
+	values[13] = Int64GetDatum(slotent->slotsync_skip_count);
 
 	if (slotent->slotsync_last_skip == 0)
-		nulls[11] = true;
+		nulls[14] = true;
 	else
-		values[11] = TimestampTzGetDatum(slotent->slotsync_last_skip);
+		values[14] = TimestampTzGetDatum(slotent->slotsync_last_skip);
 
 	if (slotent->stat_reset_timestamp == 0)
-		nulls[12] = true;
+		nulls[15] = true;
 	else
-		values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+		values[15] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fd9448ec7b9..8cd14c88bdb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5691,9 +5691,9 @@
 { oid => '6169', descr => 'statistics: information about replication slot',
   proname => 'pg_stat_get_replication_slot', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,slotsync_skip_count,slotsync_last_skip,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_wal_txns,total_wal_bytes,filtered_bytes,sent_txns,sent_bytes,slotsync_skip_count,slotsync_last_skip,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 
 { oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f23dd5870da..cc4e0a561b0 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -398,8 +398,11 @@ typedef struct PgStat_StatReplSlotEntry
 	PgStat_Counter stream_count;
 	PgStat_Counter stream_bytes;
 	PgStat_Counter mem_exceeded_count;
-	PgStat_Counter total_txns;
-	PgStat_Counter total_bytes;
+	PgStat_Counter total_wal_txns;
+	PgStat_Counter total_wal_bytes;
+	PgStat_Counter sent_txns;
+	PgStat_Counter sent_bytes;
+	PgStat_Counter filtered_bytes;
 	PgStat_Counter slotsync_skip_count;
 	TimestampTz slotsync_last_skip;
 	TimestampTz stat_reset_timestamp;
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71887..8c27e8266e7 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -56,17 +56,19 @@ typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn);
 
 /*
- * Callback for every individual change in a successful transaction.
+ * Callback for every individual change in a successful transaction. Should
+ * return true if the change is output, false otherwise.
  */
-typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn,
 									   Relation relation,
 									   ReorderBufferChange *change);
 
 /*
- * Callback for every TRUNCATE in a successful transaction.
+ * Callback for every TRUNCATE in a successful transaction. Should return true if
+ * the change is output, false otherwise.
  */
-typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
 										 ReorderBufferTXN *txn,
 										 int nrelations,
 										 Relation relations[],
@@ -74,8 +76,9 @@ typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
 
 /*
  * Called for every (explicit or implicit) COMMIT of a successful transaction.
+ * Should return true if the transaction is output, false otherwise.
  */
-typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
@@ -118,10 +121,10 @@ typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
 											 ReorderBufferTXN *txn);
 
 /*
- * Called for PREPARE record unless it was filtered by filter_prepare()
- * callback.
+ * Called for PREPARE record unless it was filtered by filter_prepare() callback.
+ * Should return true if the transaction is output, false otherwise.
  */
-typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn,
 										XLogRecPtr prepare_lsn);
 
@@ -159,32 +162,35 @@ typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
 
 /*
  * Called to discard changes streamed to remote node from in-progress
- * transaction.
+ * transaction. Should return true if the transaction is output, false
+ * otherwise.
  */
-typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
 											ReorderBufferTXN *txn,
 											XLogRecPtr abort_lsn);
 
 /*
  * Called to prepare changes streamed to remote node from in-progress
- * transaction. This is called as part of a two-phase commit.
+ * transaction. This is called as part of a two-phase commit.  Should return true
+ * if the transaction is output, false otherwise.
  */
-typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
 											  ReorderBufferTXN *txn,
 											  XLogRecPtr prepare_lsn);
 
 /*
- * Called to apply changes streamed to remote node from in-progress
- * transaction.
+ * Called to apply changes streamed to remote node from in-progress transaction.
+ * Should return true if the transaction is output, false otherwise.
  */
-typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
 											 ReorderBufferTXN *txn,
 											 XLogRecPtr commit_lsn);
 
 /*
  * Callback for streaming individual changes from in-progress transactions.
+ * Should return true if the change is output, false otherwise.
  */
-typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
 											 ReorderBufferTXN *txn,
 											 Relation relation,
 											 ReorderBufferChange *change);
@@ -202,9 +208,10 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
 											  const char *message);
 
 /*
- * Callback for streaming truncates from in-progress transactions.
+ * Callback for streaming truncates from in-progress transactions. Should return
+ * true if the change is output, false otherwise.
  */
-typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
 											   ReorderBufferTXN *txn,
 											   int nrelations,
 											   Relation relations[],
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3cbe106a3c7..bd4c17da7ac 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -699,6 +699,11 @@ struct ReorderBuffer
 	 */
 	int64		totalTxns;		/* total number of transactions sent */
 	int64		totalBytes;		/* total amount of data decoded */
+	int64		sentTxns;		/* number of transactions decoded and sent
+								 * downstream */
+	int64		sentBytes;		/* amount of data decoded and sent downstream */
+	int64		filteredBytes;	/* amount of data filtered out by output
+								 * plugin */
 };
 
 
@@ -718,6 +723,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids);
 extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid,
 									 XLogRecPtr lsn, ReorderBufferChange *change,
 									 bool toast_insert);
+extern Size ReorderBufferChangeSize(ReorderBufferChange *change);
 extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Snapshot snap, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 96b70b84d5e..c8ada58379b 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -214,10 +214,10 @@ my $stats_test_slot2 = 'logical_slot';
 # Stats exist for stats test slot 1
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT total_wal_bytes > 0, sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
+	qq(t|t|t),
+	qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
 );
 
 # Do reset of stats for stats test slot 1
@@ -235,10 +235,10 @@ $node_primary->safe_psql('postgres',
 
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT stats_reset > '$reset1'::timestamptz, total_wal_bytes = 0, sent_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.)
+	qq(t|t|t),
+	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_wal_bytes and sent_bytes were set to 0.)
 );
 
 # Check that test slot 2 has NULL in reset timestamp
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index ebe2fae1789..5f4df30d65a 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -577,7 +577,7 @@ $node_primary->safe_psql('testdb',
 	qq[INSERT INTO decoding_test(x,y) SELECT 100,'100';]);
 
 $node_standby->poll_query_until('testdb',
-	qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
+	qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
 ) or die "replication slot stats of vacuum_full_activeslot not updated";
 
 # This should trigger the conflict
@@ -605,7 +605,7 @@ ok( $stderr =~
 # Ensure that replication slot stats are not removed after invalidation.
 is( $node_standby->safe_psql(
 		'testdb',
-		qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
+		qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
 	),
 	't',
 	'replication slot stats not removed after invalidation');
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 4286c266e17..9801c66fba8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2160,6 +2160,7 @@ pg_stat_replication| SELECT s.pid,
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
+    r.plugin,
     s.spill_txns,
     s.spill_count,
     s.spill_bytes,
@@ -2167,13 +2168,16 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_count,
     s.stream_bytes,
     s.mem_exceeded_count,
-    s.total_txns,
-    s.total_bytes,
+    s.total_wal_txns,
+    s.total_wal_bytes,
+    s.filtered_bytes,
+    s.sent_txns,
+    s.sent_bytes,
     s.slotsync_skip_count,
     s.slotsync_last_skip,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, slotsync_skip_count, slotsync_last_skip, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_wal_txns, total_wal_bytes, filtered_bytes, sent_txns, sent_bytes, slotsync_skip_count, slotsync_last_skip, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT name,
     blks_zeroed,
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 430c1246d14..68501aa6ad5 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -124,6 +124,9 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
 is($result, qq(1002), 'check initial data was copied to subscriber');
 
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
@@ -157,6 +160,14 @@ $node_publisher->safe_psql('postgres',
 
 $node_publisher->wait_for_catchup('tap_sub');
 
+# Verify that filtered_bytes increased due to filtered update and delete
+# operations on tab_ins.  We cannot test the exact value since it may include
+# changes from other concurrent transactions.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+	'filtered_bytes increased after DML filtering');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_ins");
 is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
index 3d16c2a800d..011c931dbd3 100644
--- a/src/test/subscription/t/010_truncate.pl
+++ b/src/test/subscription/t/010_truncate.pl
@@ -69,6 +69,9 @@ $node_subscriber->safe_psql('postgres',
 # Wait for initial sync of all subscriptions
 $node_subscriber->wait_for_subscription_sync;
 
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+
 # insert data to truncate
 
 $node_subscriber->safe_psql('postgres',
@@ -98,6 +101,16 @@ $node_publisher->wait_for_catchup('sub1');
 $result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')");
 is($result, qq(101), 'truncate restarted identities');
 
+# All the DMLs above happen on tables that are subscribed to by sub1 and not
+# sub2. filtered_bytes should get incremented for replication slot
+# corresponding to the subscription sub2. We can not test the exact value of
+# filtered_bytes because the counter is affected by background activity.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+	'filtered_bytes increased after publication level filtering');
+$initial_filtered_bytes = $final_filtered_bytes;
+
 # test publication that does not replicate truncate
 
 $node_subscriber->safe_psql('postgres',
@@ -107,6 +120,13 @@ $node_publisher->safe_psql('postgres', "TRUNCATE tab2");
 
 $node_publisher->wait_for_catchup('sub2');
 
+# Truncate changes are filtered out at publication level itself. Make sure that
+# the filtered_bytes is incremented.
+$final_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+	'filtered_bytes increased after truncate filtering');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab2");
 is($result, qq(3|1|3), 'truncate not replicated');
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index e2c83670053..b772676d6bc 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -579,6 +579,9 @@ is($result, qq(3|6),
 # commands are for testing normal logical replication behavior.
 #
 # test row filter (INSERT, UPDATE, DELETE)
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')");
 $node_publisher->safe_psql('postgres',
@@ -612,6 +615,14 @@ $node_publisher->safe_psql('postgres',
 
 $node_publisher->wait_for_catchup($appname);
 
+# The changes which do not pass the row filter will be filtered. Make sure that
+# the filtered_bytes reflects that. We can not test the exact value of
+# filtered_bytes since it is affected by background activity.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+	"SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+	'filtered_bytes increased after row filtering');
+
 # Check expected replicated rows for tab_rowfilter_2
 # tap_pub_1 filter is: (c % 2 = 0)
 # tap_pub_2 filter is: (c % 3 = 0)

base-commit: 1362bc33e025fd2848ff38558f5672e2f0f0c7de
-- 
2.34.1

Reply via email to