diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 0c02e46..ee36c3b 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -3046,6 +3046,60 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_commit</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_commit_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data successfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_error</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions unsuccessfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_error_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data unsuccessfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_abort</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_abort_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data aborted in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>latest_end_time</structfield> <type>timestamp with time zone</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 449692a..109ba92 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -909,10 +909,37 @@ CREATE VIEW pg_stat_subscription AS
             st.last_msg_send_time,
             st.last_msg_receipt_time,
             st.latest_end_lsn,
+            pg_stat_get_subscription_xact_commit(su.oid, st.relid) AS xact_commit,
+            pg_stat_get_subscription_xact_commit_bytes(su.oid, st.relid) AS xact_commit_bytes,
+            pg_stat_get_subscription_xact_error(su.oid, st.relid) AS xact_error,
+            pg_stat_get_subscription_xact_error_bytes(su.oid, st.relid) AS xact_error_bytes,
+            pg_stat_get_subscription_xact_abort(su.oid, st.relid) AS xact_abort,
+            pg_stat_get_subscription_xact_abort_bytes(su.oid, st.relid) AS xact_abort_bytes,
             st.latest_end_time
     FROM pg_subscription su
-            LEFT JOIN pg_stat_get_subscription(NULL) st
-                      ON (st.subid = su.oid);
+            LEFT JOIN
+	    (SELECT
+		s.subid,
+		s.relid,
+		s.pid,
+		s.received_lsn,
+		s.last_msg_send_time,
+		s.last_msg_receipt_time,
+		s.latest_end_lsn,
+		s.latest_end_time
+	    FROM pg_stat_get_subscription(NULL) s
+	    UNION -- acquire relid of table sync worker
+	    SELECT
+		r.srsubid,
+		r.srrelid,
+		NULL as pid,
+		NULL as received_lsn,
+		NULL as last_msg_send_time,
+		NULL as last_msg_receipt_time,
+		NULL as latest_end_lsn,
+		NULL as latest_end_time
+	    FROM pg_subscription_rel r WHERE r.srsubstate <> 'r') st
+		ON (st.subid = su.oid);
 
 CREATE VIEW pg_stat_ssl AS
     SELECT
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 778e409..bca3c41 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -55,6 +55,7 @@
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
+#include "replication/logicalworker.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
 #include "storage/fd.h"
@@ -378,6 +379,7 @@ static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len
 static void pgstat_recv_connstat(PgStat_MsgConn *msg, int len);
 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subscription_success(PgStat_MsgSubscriptionErr *msg, int len);
 static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len);
 static void pgstat_recv_subscription_error_purge(PgStat_MsgSubscriptionErrPurge *msg,
 												 int len);
@@ -2038,6 +2040,40 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
+ * pgstat_report_subscription_success() -
+ *
+ *  Tell the collector about the subscription success.
+ * ----------
+ */
+void
+pgstat_report_subscription_success(Oid subid, Oid subrel,
+								   LogicalRepMsgType command, PgStat_Counter bytes)
+{
+	PgStat_MsgSubscriptionErr msg;
+
+	Assert(command == 0 /* table sync worker */ ||
+		   command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_STREAM_ABORT);
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrel;
+	msg.m_clear = false;
+	msg.m_reset = false;
+	msg.m_databaseid = InvalidOid;
+	msg.m_relid = InvalidOid;
+	msg.m_command = command;
+	msg.m_xid = InvalidTransactionId;
+	msg.m_bytes = bytes;
+	msg.m_failure_time = GetCurrentTimestamp();
+	msg.m_errmsg[0] = '\0';
+	pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionErr));
+
+	reset_apply_error_context_xact_size();
+}
+
+/* ----------
  * pgstat_report_subscription_error() -
  *
  *	Tell the collector about the subscription error.
@@ -2063,10 +2099,13 @@ pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
 	msg.m_relid = relid;
 	msg.m_command = command;
 	msg.m_xid = xid;
+	msg.m_bytes = get_apply_error_context_xact_size();
 	msg.m_failure_time = GetCurrentTimestamp();
 	strlcpy(msg.m_errmsg, errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
 
 	pgstat_send(&msg, len);
+
+	reset_apply_error_context_xact_size();
 }
 
 /* ----------
@@ -3730,6 +3769,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_connstat(&msg.msg_conn, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS:
+					pgstat_recv_subscription_success(&msg.msg_subscriptionerr, len);
+					break;
+
 				case PGSTAT_MTYPE_SUBSCRIPTIONERR:
 					pgstat_recv_subscription_error(&msg.msg_subscriptionerr, len);
 					break;
@@ -6153,6 +6196,45 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subscription_success() -
+ *
+ *  Process a SUBSCRIPTIONERR message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_success(PgStat_MsgSubscriptionErr *msg, int len)
+{
+	PgStat_StatSubErrEntry *errent;
+	bool        create = !(msg->m_reset || msg->m_clear);
+
+	errent = pgstat_get_subscription_error_entry(msg->m_subid,
+												 msg->m_subrelid,
+												 create);
+
+	/* msg from table sync worker */
+	if (msg->m_command == 0)
+		errent->xact_commit++;
+	else
+	{
+		switch(msg->m_command)
+		{
+			case LOGICAL_REP_MSG_COMMIT:
+			case LOGICAL_REP_MSG_COMMIT_PREPARED:
+				errent->xact_commit++;
+				errent->xact_commit_bytes += msg->m_bytes;
+				break;
+			case LOGICAL_REP_MSG_STREAM_ABORT:
+				errent->xact_abort++;
+				errent->xact_abort_bytes = msg->m_bytes;
+				break;
+			default:
+				elog(ERROR, "unexpected command type");
+		}
+	}
+	errent->last_failure = msg->m_failure_time;
+}
+
+/* ----------
  * pgstat_recv_subscription_error() -
  *
  *	Process a SUBSCRIPTIONERR message.
@@ -6202,6 +6284,8 @@ pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len)
 		errent->relid = msg->m_relid;
 		errent->command = msg->m_command;
 		errent->xid = msg->m_xid;
+		errent->xact_error++;
+		errent->xact_error_bytes += msg->m_bytes;
 		errent->failure_count++;
 		errent->last_failure = msg->m_failure_time;
 		strlcpy(errent->last_errmsg, msg->m_errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
@@ -6465,6 +6549,12 @@ pgstat_get_subscription_error_entry(Oid subid, Oid subrelid, bool create)
 		errent->relid = InvalidOid;
 		errent->command = 0;
 		errent->xid = InvalidTransactionId;
+		errent->xact_commit = 0;
+		errent->xact_commit_bytes = 0;
+		errent->xact_error = 0;
+		errent->xact_error_bytes = 0;
+		errent->xact_abort = 0;
+		errent->xact_abort_bytes = 0;
 		errent->failure_count = 0;
 		errent->last_failure = 0;
 		errent->last_errmsg[0] = '\0';
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b..64d18c6 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -17,6 +17,7 @@
 #include "catalog/pg_type.h"
 #include "libpq/pqformat.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalworker.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
@@ -872,6 +873,9 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 			case LOGICALREP_COLUMN_TEXT:
 				len = pq_getmsgint(in, 4);	/* read length */
 
+				/* accumulate data length for transaction stats */
+				add_apply_error_context_xact_size(len);
+
 				/* and data */
 				value->data = palloc(len + 1);
 				pq_copymsgbytes(in, value->data, len);
@@ -884,6 +888,9 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 			case LOGICALREP_COLUMN_BINARY:
 				len = pq_getmsgint(in, 4);	/* read length */
 
+				/* accumulate data length for transaction stats */
+				add_apply_error_context_xact_size(len);
+
 				/* and data */
 				value->data = palloc(len + 1);
 				pq_copymsgbytes(in, value->data, len);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a..7ba7486 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1150,6 +1150,15 @@ copy_table_done:
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
 	/*
+	 * Update the stats for table sync. We don't record the bytes of
+	 * table synchronization.
+	 */
+	pgstat_report_subscription_success(MyLogicalRepWorker->subid,
+									   MyLogicalRepWorker->relid,
+									   0, /* no corresponding logical message type */
+									   0);
+
+	/*
 	 * Finally, wait until the main apply worker tells us to catch up and then
 	 * return to let LogicalRepApplyLoop do it.
 	 */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 34ed8e4..a3f9ce3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -222,6 +222,9 @@ typedef struct ApplyErrorCallbackArg
 	LogicalRepMsgType command;	/* 0 if invalid */
 	LogicalRepRelMapEntry *rel;
 
+	/* data size of this transaction */
+	PgStat_Counter bytes;
+
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
@@ -232,6 +235,7 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 {
 	.command = 0,
 	.rel = NULL,
+	.bytes = 0,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
 	.ts = 0,
@@ -863,6 +867,10 @@ apply_handle_commit(StringInfo s)
 	process_syncing_tables(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
+	pgstat_report_subscription_success(MySubscription->oid,
+									   InvalidOid,
+									   LOGICAL_REP_MSG_COMMIT,
+									   apply_error_callback_arg.bytes);
 	reset_apply_error_context_info();
 }
 
@@ -1016,6 +1024,10 @@ apply_handle_commit_prepared(StringInfo s)
 	process_syncing_tables(prepare_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
+	pgstat_report_subscription_success(MySubscription->oid,
+									   InvalidOid,
+									   LOGICAL_REP_MSG_COMMIT_PREPARED,
+									   apply_error_callback_arg.bytes);
 	reset_apply_error_context_info();
 }
 
@@ -1350,6 +1362,10 @@ apply_handle_stream_abort(StringInfo s)
 	if (is_skipping_changes())
 		stop_skipping_changes(InvalidXLogRecPtr, 0);
 
+	pgstat_report_subscription_success(MySubscription->oid,
+									   InvalidOid,
+									   LOGICAL_REP_MSG_STREAM_ABORT,
+									   apply_error_callback_arg.bytes);
 	reset_apply_error_context_info();
 }
 
@@ -3380,6 +3396,9 @@ stream_write_change(char action, StringInfo s)
 	len = (s->len - s->cursor);
 
 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
+
+	/* accumulate the bytes for stats */
+	add_apply_error_context_xact_size(len);
 }
 
 /*
@@ -3744,6 +3763,27 @@ set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
 	apply_error_callback_arg.ts = ts;
 }
 
+/* Exported so that stats collector can utilize this value */
+int64
+get_apply_error_context_xact_size(void)
+{
+	return apply_error_callback_arg.bytes;
+}
+
+/* Add size to apply error callback bytes */
+void
+add_apply_error_context_xact_size(int64 size)
+{
+	apply_error_callback_arg.bytes += size;
+}
+
+/* Reset bytes information of apply error callback */
+void
+reset_apply_error_context_xact_size(void)
+{
+	apply_error_callback_arg.bytes = 0;
+}
+
 /* Reset all information of apply error callback */
 static inline void
 reset_apply_error_context_info(void)
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index c454e2f..0b4ad21 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2492,3 +2492,136 @@ pg_stat_get_subscription_error(PG_FUNCTION_ARGS)
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+Datum
+pg_stat_get_subscription_xact_commit(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_commit);
+
+	PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_commit_bytes(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_commit_bytes);
+
+	PG_RETURN_INT64(result);
+}
+
+
+Datum
+pg_stat_get_subscription_xact_error(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_error);
+
+	PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_error_bytes(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_error_bytes);
+
+	PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_abort(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_abort);
+
+	PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_abort_bytes(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_abort_bytes);
+
+	PG_RETURN_INT64(result);
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ea4e6d2..aeb935c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5393,6 +5393,30 @@
   proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{subid,relid,datid,subid,relid,command,xid,failure_source,failure_count,last_failure,last_failure_message,stats_reset}',
   prosrc => 'pg_stat_get_subscription_error' },
+{ oid => '8525', descr => 'statistics: number of transactions commit for a subscription',
+  proname => 'pg_stat_get_subscription_xact_commit', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_commit' },
+{ oid => '8526', descr => 'statistics: bytes of transactions commit for a subscription',
+  proname => 'pg_stat_get_subscription_xact_commit_bytes', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_commit_bytes' },
+{ oid => '8527', descr => 'statistics: number of transactions error for a subscription',
+  proname => 'pg_stat_get_subscription_xact_error', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_error' },
+{ oid => '8528', descr => 'statistics: bytes of transactions error for a subscription',
+  proname => 'pg_stat_get_subscription_xact_error_bytes', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_error_bytes' },
+{ oid => '8529', descr => 'statistics: number of transactions abort for a subscription',
+  proname => 'pg_stat_get_subscription_xact_abort', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_abort' },
+{ oid => '8530', descr => 'statistics: bytes of transactions abort for a subscription',
+  proname => 'pg_stat_get_subscription_xact_abort_bytes', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_abort_bytes' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6775736..2a43135 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -68,6 +68,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RESETSLRUCOUNTER,
 	PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
 	PGSTAT_MTYPE_SUBSCRIPTIONERR,
+	PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS,
 	PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_AUTOVAC_START,
@@ -565,6 +566,7 @@ typedef struct PgStat_MsgSubscriptionErr
 	Oid			m_relid;
 	LogicalRepMsgType m_command;
 	TransactionId m_xid;
+	PgStat_Counter m_bytes;
 	TimestampTz m_failure_time;
 	char		m_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
 } PgStat_MsgSubscriptionErr;
@@ -1002,8 +1004,12 @@ typedef struct PgStat_StatSubEntry
  * (subrelid is InvalidOid) or by the table sync worker (subrelid is a valid OID).
  * The error reported by the table sync worker is removed also when the table
  * synchronization process completed.
+ *
+ * Other general stats for transaction on this subscription are stored in this entry
+ * as well, aligned with transaction error and its total bytes. Holding the size of
+ * computed xact_error_bytes during replay depends on the apply error callback and
+ * having general stats in the same place reduces code complexity.
  */
-
 typedef struct PgStat_StatSubErrEntry
 {
 	Oid			subrelid;		/* InvalidOid if the apply worker, otherwise
@@ -1014,7 +1020,16 @@ typedef struct PgStat_StatSubErrEntry
 								 * case. */
 	LogicalRepMsgType command;
 	TransactionId xid;
-	PgStat_Counter failure_count;
+
+	PgStat_Counter xact_commit;
+	PgStat_Counter xact_commit_bytes;
+	PgStat_Counter xact_error;
+	PgStat_Counter xact_error_bytes; /* total error counts of this
+										subscription */
+	PgStat_Counter xact_abort;
+	PgStat_Counter xact_abort_bytes;
+	PgStat_Counter failure_count; /* total error counts of one
+									 specific error */
 	TimestampTz last_failure;
 	char		last_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
 	TimestampTz stat_reset_timestamp;
@@ -1129,10 +1144,11 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subscription_success(Oid subid, Oid subrel,
+											   LogicalRepMsgType command, PgStat_Counter bytes);
 extern void pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
 											 LogicalRepMsgType command,
 											 TransactionId xid, const char *errmsg);
-
 extern void pgstat_initialize(void);
 
 
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 2ad61a0..923c8ce 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -15,5 +15,9 @@
 extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
+extern int64 get_apply_error_context_xact_size(void);
+extern void add_apply_error_context_xact_size(int64 size);
+extern void reset_apply_error_context_xact_size(void);
+
 
 #endif							/* LOGICALWORKER_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 66b185f..89c68f1 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2091,9 +2091,34 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.last_msg_send_time,
     st.last_msg_receipt_time,
     st.latest_end_lsn,
+    pg_stat_get_subscription_xact_commit(su.oid, st.relid) AS xact_commit,
+    pg_stat_get_subscription_xact_commit_bytes(su.oid, st.relid) AS xact_commit_bytes,
+    pg_stat_get_subscription_xact_error(su.oid, st.relid) AS xact_error,
+    pg_stat_get_subscription_xact_error_bytes(su.oid, st.relid) AS xact_error_bytes,
+    pg_stat_get_subscription_xact_abort(su.oid, st.relid) AS xact_abort,
+    pg_stat_get_subscription_xact_abort_bytes(su.oid, st.relid) AS xact_abort_bytes,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN ( SELECT s.subid,
+            s.relid,
+            s.pid,
+            s.received_lsn,
+            s.last_msg_send_time,
+            s.last_msg_receipt_time,
+            s.latest_end_lsn,
+            s.latest_end_time
+           FROM pg_stat_get_subscription(NULL::oid) s(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time)
+        UNION
+         SELECT r.srsubid,
+            r.srrelid,
+            NULL::integer AS pid,
+            NULL::pg_lsn AS received_lsn,
+            NULL::timestamp with time zone AS last_msg_send_time,
+            NULL::timestamp with time zone AS last_msg_receipt_time,
+            NULL::pg_lsn AS latest_end_lsn,
+            NULL::timestamp with time zone AS latest_end_time
+           FROM pg_subscription_rel r
+          WHERE (r.srsubstate <> 'r'::"char")) st ON ((st.subid = su.oid)));
 pg_stat_subscription_errors| SELECT d.datname,
     sr.subid,
     s.subname,
