diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 0c02e46..1ce9f5a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -3046,6 +3046,64 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_commit</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_commit_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data successfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_error</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions unsuccessfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_error_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data unsuccessfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_abort</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_abort_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data aborted in this subscription.
+       Increase <literal>logical_decoding_work_mem</literal> on the publisher
+       to suppress unnecessary consumed network bandwidth or change in memory
+       of the subscriber, if unexpected amount of rollbacked transactions are
+       streamed.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>latest_end_time</structfield> <type>timestamp with time zone</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 449692a..109ba92 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -909,10 +909,37 @@ CREATE VIEW pg_stat_subscription AS
             st.last_msg_send_time,
             st.last_msg_receipt_time,
             st.latest_end_lsn,
+            pg_stat_get_subscription_xact_commit(su.oid, st.relid) AS xact_commit,
+            pg_stat_get_subscription_xact_commit_bytes(su.oid, st.relid) AS xact_commit_bytes,
+            pg_stat_get_subscription_xact_error(su.oid, st.relid) AS xact_error,
+            pg_stat_get_subscription_xact_error_bytes(su.oid, st.relid) AS xact_error_bytes,
+            pg_stat_get_subscription_xact_abort(su.oid, st.relid) AS xact_abort,
+            pg_stat_get_subscription_xact_abort_bytes(su.oid, st.relid) AS xact_abort_bytes,
             st.latest_end_time
     FROM pg_subscription su
-            LEFT JOIN pg_stat_get_subscription(NULL) st
-                      ON (st.subid = su.oid);
+            LEFT JOIN
+	    (SELECT
+		s.subid,
+		s.relid,
+		s.pid,
+		s.received_lsn,
+		s.last_msg_send_time,
+		s.last_msg_receipt_time,
+		s.latest_end_lsn,
+		s.latest_end_time
+	    FROM pg_stat_get_subscription(NULL) s
+	    UNION -- acquire relid of table sync worker
+	    SELECT
+		r.srsubid,
+		r.srrelid,
+		NULL as pid,
+		NULL as received_lsn,
+		NULL as last_msg_send_time,
+		NULL as last_msg_receipt_time,
+		NULL as latest_end_lsn,
+		NULL as latest_end_time
+	    FROM pg_subscription_rel r WHERE r.srsubstate <> 'r') st
+		ON (st.subid = su.oid);
 
 CREATE VIEW pg_stat_ssl AS
     SELECT
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 5c723bc..79a25bd 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -194,6 +194,17 @@ static void find_matching_subplans_recurse(PartitionPruningData *prunedata,
 
 
 /*
+ * PartitionTupleRoutingSize - exported to calculate total data size
+ * of logical replication mesage apply, since it is one of the
+ * ApplyExecutionData struct members.
+ */
+size_t
+PartitionTupleRoutingSize(void)
+{
+	return sizeof(PartitionTupleRouting);
+}
+
+/*
  * ExecSetupPartitionTupleRouting - sets up information needed during
  * tuple routing for partitioned tables, encapsulates it in
  * PartitionTupleRouting, and returns it.
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 778e409..bca3c41 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -55,6 +55,7 @@
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
+#include "replication/logicalworker.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
 #include "storage/fd.h"
@@ -378,6 +379,7 @@ static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len
 static void pgstat_recv_connstat(PgStat_MsgConn *msg, int len);
 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subscription_success(PgStat_MsgSubscriptionErr *msg, int len);
 static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len);
 static void pgstat_recv_subscription_error_purge(PgStat_MsgSubscriptionErrPurge *msg,
 												 int len);
@@ -2038,6 +2040,40 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
+ * pgstat_report_subscription_success() -
+ *
+ *  Tell the collector about the subscription success.
+ * ----------
+ */
+void
+pgstat_report_subscription_success(Oid subid, Oid subrel,
+								   LogicalRepMsgType command, PgStat_Counter bytes)
+{
+	PgStat_MsgSubscriptionErr msg;
+
+	Assert(command == 0 /* table sync worker */ ||
+		   command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_STREAM_ABORT);
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrel;
+	msg.m_clear = false;
+	msg.m_reset = false;
+	msg.m_databaseid = InvalidOid;
+	msg.m_relid = InvalidOid;
+	msg.m_command = command;
+	msg.m_xid = InvalidTransactionId;
+	msg.m_bytes = bytes;
+	msg.m_failure_time = GetCurrentTimestamp();
+	msg.m_errmsg[0] = '\0';
+	pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionErr));
+
+	reset_apply_error_context_xact_size();
+}
+
+/* ----------
  * pgstat_report_subscription_error() -
  *
  *	Tell the collector about the subscription error.
@@ -2063,10 +2099,13 @@ pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
 	msg.m_relid = relid;
 	msg.m_command = command;
 	msg.m_xid = xid;
+	msg.m_bytes = get_apply_error_context_xact_size();
 	msg.m_failure_time = GetCurrentTimestamp();
 	strlcpy(msg.m_errmsg, errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
 
 	pgstat_send(&msg, len);
+
+	reset_apply_error_context_xact_size();
 }
 
 /* ----------
@@ -3730,6 +3769,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_connstat(&msg.msg_conn, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS:
+					pgstat_recv_subscription_success(&msg.msg_subscriptionerr, len);
+					break;
+
 				case PGSTAT_MTYPE_SUBSCRIPTIONERR:
 					pgstat_recv_subscription_error(&msg.msg_subscriptionerr, len);
 					break;
@@ -6153,6 +6196,45 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subscription_success() -
+ *
+ *  Process a SUBSCRIPTIONERR message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_success(PgStat_MsgSubscriptionErr *msg, int len)
+{
+	PgStat_StatSubErrEntry *errent;
+	bool        create = !(msg->m_reset || msg->m_clear);
+
+	errent = pgstat_get_subscription_error_entry(msg->m_subid,
+												 msg->m_subrelid,
+												 create);
+
+	/* msg from table sync worker */
+	if (msg->m_command == 0)
+		errent->xact_commit++;
+	else
+	{
+		switch(msg->m_command)
+		{
+			case LOGICAL_REP_MSG_COMMIT:
+			case LOGICAL_REP_MSG_COMMIT_PREPARED:
+				errent->xact_commit++;
+				errent->xact_commit_bytes += msg->m_bytes;
+				break;
+			case LOGICAL_REP_MSG_STREAM_ABORT:
+				errent->xact_abort++;
+				errent->xact_abort_bytes = msg->m_bytes;
+				break;
+			default:
+				elog(ERROR, "unexpected command type");
+		}
+	}
+	errent->last_failure = msg->m_failure_time;
+}
+
+/* ----------
  * pgstat_recv_subscription_error() -
  *
  *	Process a SUBSCRIPTIONERR message.
@@ -6202,6 +6284,8 @@ pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len)
 		errent->relid = msg->m_relid;
 		errent->command = msg->m_command;
 		errent->xid = msg->m_xid;
+		errent->xact_error++;
+		errent->xact_error_bytes += msg->m_bytes;
 		errent->failure_count++;
 		errent->last_failure = msg->m_failure_time;
 		strlcpy(errent->last_errmsg, msg->m_errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
@@ -6465,6 +6549,12 @@ pgstat_get_subscription_error_entry(Oid subid, Oid subrelid, bool create)
 		errent->relid = InvalidOid;
 		errent->command = 0;
 		errent->xid = InvalidTransactionId;
+		errent->xact_commit = 0;
+		errent->xact_commit_bytes = 0;
+		errent->xact_error = 0;
+		errent->xact_error_bytes = 0;
+		errent->xact_abort = 0;
+		errent->xact_abort_bytes = 0;
 		errent->failure_count = 0;
 		errent->last_failure = 0;
 		errent->last_errmsg[0] = '\0';
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b..2e1fc94 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -17,6 +17,7 @@
 #include "catalog/pg_type.h"
 #include "libpq/pqformat.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalworker.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a..7ba7486 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1150,6 +1150,15 @@ copy_table_done:
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
 	/*
+	 * Update the stats for table sync. We don't record the bytes of
+	 * table synchronization.
+	 */
+	pgstat_report_subscription_success(MyLogicalRepWorker->subid,
+									   MyLogicalRepWorker->relid,
+									   0, /* no corresponding logical message type */
+									   0);
+
+	/*
 	 * Finally, wait until the main apply worker tells us to catch up and then
 	 * return to let LogicalRepApplyLoop do it.
 	 */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 34ed8e4..7eca5df 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -222,6 +222,20 @@ typedef struct ApplyErrorCallbackArg
 	LogicalRepMsgType command;	/* 0 if invalid */
 	LogicalRepRelMapEntry *rel;
 
+	/*
+	 * Store data size of this transaction.
+	 *
+	 * The byte size of transaction on the publisher is calculated
+	 * by ReorderBufferChangeSize() based on the ReorderBufferChange
+	 * structure. But on the subscriber, consumed resources are
+	 * not same as the publisher's decoding process and necessary
+	 * to compute those in different way. Then, the exact same byte
+	 * size is not restored on the subscriber usually. Further, in
+	 * order to give accurate bytes even in the case of error, add
+	 * byte size to this value step by step.
+	 */
+	PgStat_Counter bytes;
+
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
@@ -232,6 +246,7 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 {
 	.command = 0,
 	.rel = NULL,
+	.bytes = 0,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
 	.ts = 0,
@@ -328,6 +343,7 @@ static void maybe_reread_subscription(void);
 
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
+void update_apply_change_size(LogicalRepMsgType action, void *data);
 
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
@@ -863,6 +879,10 @@ apply_handle_commit(StringInfo s)
 	process_syncing_tables(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
+	pgstat_report_subscription_success(MySubscription->oid,
+									   InvalidOid,
+									   LOGICAL_REP_MSG_COMMIT,
+									   apply_error_callback_arg.bytes);
 	reset_apply_error_context_info();
 }
 
@@ -971,6 +991,9 @@ apply_handle_prepare(StringInfo s)
 
 	in_remote_transaction = false;
 
+	/* Update the size of change in memory of this transaction  */
+	update_apply_change_size(LOGICAL_REP_MSG_PREPARE, NULL);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
@@ -1012,10 +1035,17 @@ apply_handle_commit_prepared(StringInfo s)
 	store_flush_position(prepare_data.end_lsn);
 	in_remote_transaction = false;
 
+	/* Update the size of change in memory of this transaction  */
+	update_apply_change_size(LOGICAL_REP_MSG_COMMIT_PREPARED, NULL);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
+	pgstat_report_subscription_success(MySubscription->oid,
+									   InvalidOid,
+									   LOGICAL_REP_MSG_COMMIT_PREPARED,
+									   apply_error_callback_arg.bytes);
 	reset_apply_error_context_info();
 }
 
@@ -1198,6 +1228,10 @@ apply_handle_stream_start(StringInfo s)
 		oldctx = MemoryContextSwitchTo(ApplyContext);
 
 		stream_fileset = palloc(sizeof(FileSet));
+
+		/* Update the size of change in memory of this transaction  */
+		update_apply_change_size(LOGICAL_REP_MSG_STREAM_START, NULL);
+
 		FileSetInit(stream_fileset);
 
 		MemoryContextSwitchTo(oldctx);
@@ -1350,6 +1384,10 @@ apply_handle_stream_abort(StringInfo s)
 	if (is_skipping_changes())
 		stop_skipping_changes(InvalidXLogRecPtr, 0);
 
+	pgstat_report_subscription_success(MySubscription->oid,
+									   InvalidOid,
+									   LOGICAL_REP_MSG_STREAM_ABORT,
+									   apply_error_callback_arg.bytes);
 	reset_apply_error_context_info();
 }
 
@@ -1498,6 +1536,10 @@ apply_handle_stream_commit(StringInfo s)
 		stop_skipping_changes(commit_data.end_lsn, commit_data.committime);
 
 		store_flush_position(commit_data.end_lsn);
+
+		/* Update the size of change in memory of this transaction  */
+		update_apply_change_size(LOGICAL_REP_MSG_STREAM_COMMIT, NULL);
+
 		in_remote_transaction = false;
 	}
 	else
@@ -1650,6 +1692,9 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update the size of change in memory of this transaction  */
+	update_apply_change_size(LOGICAL_REP_MSG_INSERT, rel);
+
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1807,6 +1852,9 @@ apply_handle_update(StringInfo s)
 					has_oldtup ? &oldtup : &newtup);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update the size of change in memory of this transaction  */
+	update_apply_change_size(LOGICAL_REP_MSG_INSERT, rel);
+
 	/* For a partitioned table, apply update to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1941,6 +1989,9 @@ apply_handle_delete(StringInfo s)
 	slot_store_data(remoteslot, rel, &oldtup);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update the size of change in memory of this transaction  */
+	update_apply_change_size(LOGICAL_REP_MSG_INSERT, rel);
+
 	/* For a partitioned table, apply delete to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -2503,6 +2554,133 @@ apply_dispatch(StringInfo s)
 }
 
 /*
+ * Subscriber side implementation equivalent to ReorderBufferChangeSize
+ * on the publisher.
+ *
+ * According to the logical replication message type, record major
+ * resource consumptions of this subscription for each message.
+ * At present, do not collect data from generic functions to keep
+ * code simplicity. Also, by adding multiple values at once,
+ * reduce function calls of this function itself.
+ *
+ * 'data' controls detail handling of data size calculation.
+ */
+void
+update_apply_change_size(LogicalRepMsgType action, void *data)
+{
+	int64		size = 0;
+	LogicalRepRelMapEntry *relmapentry;
+	LogicalRepRelation *reprelation;
+	int			*stream_write_len;
+
+	/*
+	 * In streaming mode, stream_write_change is called
+	 * instead of immediate apply. List up the messages types
+	 * that can be caught by handle_streamed_transaction and
+	 * treat the write length as the size of transaction so
+	 * that we can export it as part of pg_stat_subscription.
+	 */
+	if (in_streamed_transaction &&
+		(action == LOGICAL_REP_MSG_RELATION ||
+		 action == LOGICAL_REP_MSG_TYPE ||
+		 action == LOGICAL_REP_MSG_INSERT ||
+		 action == LOGICAL_REP_MSG_UPDATE ||
+		 action == LOGICAL_REP_MSG_DELETE ||
+		 action == LOGICAL_REP_MSG_TRUNCATE))
+	{
+		stream_write_len = (int *) data;
+		size += *stream_write_len;
+		add_apply_error_context_xact_size(size);
+		return;
+	}
+
+	switch (action)
+	{
+		/* Follow the same order as in the apply_dispatch */
+		case LOGICAL_REP_MSG_BEGIN:
+		case LOGICAL_REP_MSG_COMMIT:
+			break;
+
+		case LOGICAL_REP_MSG_INSERT:
+		case LOGICAL_REP_MSG_UPDATE:
+		case LOGICAL_REP_MSG_DELETE:
+			Assert(data != NULL);
+
+			/*
+			 * Compute size based on ApplyExecutionData.
+			 * The size of LogicalRepRelMapEntry can be skipped because
+			 * it is obtained from hash_search in logicalrep_rel_open.
+			 */
+			size += sizeof(ApplyExecutionData) + sizeof(EState) +
+				sizeof(ResultRelInfo) + sizeof(ResultRelInfo);
+
+			/*
+			 * Add some extra size if the target relation is partitioned.
+			 * PartitionTupleRouting isn't exported so call the function
+			 * that returns its size instead.
+			 */
+			relmapentry = (LogicalRepRelMapEntry *) data;
+			if (relmapentry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+				size += sizeof(ModifyTableState) + PartitionTupleRoutingSize();
+			break;
+
+		case LOGICAL_REP_MSG_TRUNCATE:
+			/* No special consumption except for generic functions */
+			break;
+
+		case LOGICAL_REP_MSG_RELATION:
+			Assert(data != NULL);
+
+			reprelation = (LogicalRepRelation *) data;
+			/* See logicalrep_read_attrs() for the last two */
+			size += sizeof(LogicalRepRelation) +
+				reprelation->natts * sizeof(char *) +
+				reprelation->natts * sizeof(Oid);
+			break;
+
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_ORIGIN:
+		case LOGICAL_REP_MSG_MESSAGE:
+			break;
+
+		case LOGICAL_REP_MSG_STREAM_START:
+			size += sizeof(FileSet);
+			break;
+
+		case LOGICAL_REP_MSG_STREAM_STOP:
+		case LOGICAL_REP_MSG_STREAM_ABORT:
+			break;
+
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+			size += sizeof(FlushPosition);
+			break;
+
+		case LOGICAL_REP_MSG_BEGIN_PREPARE:
+		case LOGICAL_REP_MSG_PREPARE:
+			break;
+
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+			size += sizeof(FlushPosition);
+			break;
+
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			break;
+
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			size += sizeof(FlushPosition);
+			break;
+
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid logical replication message type \"%c\"", action)));
+	}
+
+	/* update the total size of consumption */
+	add_apply_error_context_xact_size(size);
+}
+
+/*
  * Figure out which write/flush positions to report to the walsender process.
  *
  * We can't simply report back the last LSN the walsender sent us because the
@@ -3362,6 +3540,7 @@ static void
 stream_write_change(char action, StringInfo s)
 {
 	int			len;
+	int			total_len;
 
 	Assert(in_streamed_transaction);
 	Assert(TransactionIdIsValid(stream_xid));
@@ -3380,6 +3559,10 @@ stream_write_change(char action, StringInfo s)
 	len = (s->len - s->cursor);
 
 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
+
+	/* accumulate the total bytes processed in this function */
+	total_len = (s->len - s->cursor) * 2 + sizeof(char) + sizeof(action);
+	update_apply_change_size(action, &total_len);
 }
 
 /*
@@ -3744,6 +3927,27 @@ set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
 	apply_error_callback_arg.ts = ts;
 }
 
+/* Exported so that stats collector can utilize this value */
+int64
+get_apply_error_context_xact_size(void)
+{
+	return apply_error_callback_arg.bytes;
+}
+
+/* Add size to apply error callback bytes */
+void
+add_apply_error_context_xact_size(int64 size)
+{
+	apply_error_callback_arg.bytes += size;
+}
+
+/* Reset bytes information of apply error callback */
+void
+reset_apply_error_context_xact_size(void)
+{
+	apply_error_callback_arg.bytes = 0;
+}
+
 /* Reset all information of apply error callback */
 static inline void
 reset_apply_error_context_info(void)
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index c454e2f..0b4ad21 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2492,3 +2492,136 @@ pg_stat_get_subscription_error(PG_FUNCTION_ARGS)
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+Datum
+pg_stat_get_subscription_xact_commit(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_commit);
+
+	PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_commit_bytes(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_commit_bytes);
+
+	PG_RETURN_INT64(result);
+}
+
+
+Datum
+pg_stat_get_subscription_xact_error(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_error);
+
+	PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_error_bytes(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_error_bytes);
+
+	PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_abort(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_abort);
+
+	PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_abort_bytes(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	int64		result;
+	PgStat_StatSubErrEntry *errent;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription error entry */
+	if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (errent->xact_abort_bytes);
+
+	PG_RETURN_INT64(result);
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ea4e6d2..aeb935c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5393,6 +5393,30 @@
   proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{subid,relid,datid,subid,relid,command,xid,failure_source,failure_count,last_failure,last_failure_message,stats_reset}',
   prosrc => 'pg_stat_get_subscription_error' },
+{ oid => '8525', descr => 'statistics: number of transactions commit for a subscription',
+  proname => 'pg_stat_get_subscription_xact_commit', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_commit' },
+{ oid => '8526', descr => 'statistics: bytes of transactions commit for a subscription',
+  proname => 'pg_stat_get_subscription_xact_commit_bytes', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_commit_bytes' },
+{ oid => '8527', descr => 'statistics: number of transactions error for a subscription',
+  proname => 'pg_stat_get_subscription_xact_error', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_error' },
+{ oid => '8528', descr => 'statistics: bytes of transactions error for a subscription',
+  proname => 'pg_stat_get_subscription_xact_error_bytes', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_error_bytes' },
+{ oid => '8529', descr => 'statistics: number of transactions abort for a subscription',
+  proname => 'pg_stat_get_subscription_xact_abort', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_abort' },
+{ oid => '8530', descr => 'statistics: bytes of transactions abort for a subscription',
+  proname => 'pg_stat_get_subscription_xact_abort_bytes', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_get_subscription_xact_abort_bytes' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index 694e38b..773e46c 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -110,6 +110,7 @@ typedef struct PartitionPruneState
 	PartitionPruningData *partprunedata[FLEXIBLE_ARRAY_MEMBER];
 } PartitionPruneState;
 
+extern size_t PartitionTupleRoutingSize(void);
 extern PartitionTupleRouting *ExecSetupPartitionTupleRouting(EState *estate,
 															 Relation rel);
 extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6775736..7fa53ed 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -67,6 +67,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RESETSINGLECOUNTER,
 	PGSTAT_MTYPE_RESETSLRUCOUNTER,
 	PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
+	PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS,
 	PGSTAT_MTYPE_SUBSCRIPTIONERR,
 	PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
@@ -565,6 +566,7 @@ typedef struct PgStat_MsgSubscriptionErr
 	Oid			m_relid;
 	LogicalRepMsgType m_command;
 	TransactionId m_xid;
+	PgStat_Counter m_bytes;
 	TimestampTz m_failure_time;
 	char		m_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
 } PgStat_MsgSubscriptionErr;
@@ -1002,6 +1004,11 @@ typedef struct PgStat_StatSubEntry
  * (subrelid is InvalidOid) or by the table sync worker (subrelid is a valid OID).
  * The error reported by the table sync worker is removed also when the table
  * synchronization process completed.
+ *
+ * Other general stats for transaction on subscription are stored in this entry
+ * as well, aligned with transaction error and its total bytes. Holding the size of
+ * computed xact_error_bytes during replay depends on the apply error callback and
+ * having general stats in the same place reduces code complexity.
  */
 
 typedef struct PgStat_StatSubErrEntry
@@ -1014,7 +1021,18 @@ typedef struct PgStat_StatSubErrEntry
 								 * case. */
 	LogicalRepMsgType command;
 	TransactionId xid;
-	PgStat_Counter failure_count;
+
+	/* transaction stats of subscription */
+	PgStat_Counter xact_commit;
+	PgStat_Counter xact_commit_bytes;
+	PgStat_Counter xact_error;
+	PgStat_Counter xact_error_bytes; /* total error counts of this
+										subscription */
+	PgStat_Counter xact_abort;
+	PgStat_Counter xact_abort_bytes;
+
+	PgStat_Counter failure_count; /* total error counts of one
+									 specific error */
 	TimestampTz last_failure;
 	char		last_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
 	TimestampTz stat_reset_timestamp;
@@ -1129,10 +1147,11 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subscription_success(Oid subid, Oid subrel,
+											   LogicalRepMsgType command, PgStat_Counter bytes);
 extern void pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
 											 LogicalRepMsgType command,
 											 TransactionId xid, const char *errmsg);
-
 extern void pgstat_initialize(void);
 
 
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 2ad61a0..923c8ce 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -15,5 +15,9 @@
 extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
+extern int64 get_apply_error_context_xact_size(void);
+extern void add_apply_error_context_xact_size(int64 size);
+extern void reset_apply_error_context_xact_size(void);
+
 
 #endif							/* LOGICALWORKER_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 66b185f..89c68f1 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2091,9 +2091,34 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.last_msg_send_time,
     st.last_msg_receipt_time,
     st.latest_end_lsn,
+    pg_stat_get_subscription_xact_commit(su.oid, st.relid) AS xact_commit,
+    pg_stat_get_subscription_xact_commit_bytes(su.oid, st.relid) AS xact_commit_bytes,
+    pg_stat_get_subscription_xact_error(su.oid, st.relid) AS xact_error,
+    pg_stat_get_subscription_xact_error_bytes(su.oid, st.relid) AS xact_error_bytes,
+    pg_stat_get_subscription_xact_abort(su.oid, st.relid) AS xact_abort,
+    pg_stat_get_subscription_xact_abort_bytes(su.oid, st.relid) AS xact_abort_bytes,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN ( SELECT s.subid,
+            s.relid,
+            s.pid,
+            s.received_lsn,
+            s.last_msg_send_time,
+            s.last_msg_receipt_time,
+            s.latest_end_lsn,
+            s.latest_end_time
+           FROM pg_stat_get_subscription(NULL::oid) s(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time)
+        UNION
+         SELECT r.srsubid,
+            r.srrelid,
+            NULL::integer AS pid,
+            NULL::pg_lsn AS received_lsn,
+            NULL::timestamp with time zone AS last_msg_send_time,
+            NULL::timestamp with time zone AS last_msg_receipt_time,
+            NULL::pg_lsn AS latest_end_lsn,
+            NULL::timestamp with time zone AS latest_end_time
+           FROM pg_subscription_rel r
+          WHERE (r.srsubstate <> 'r'::"char")) st ON ((st.subid = su.oid)));
 pg_stat_subscription_errors| SELECT d.datname,
     sr.subid,
     s.subname,
