diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 8d4941c..284f215 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -629,7 +629,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>At least one row per subscription, showing about errors that
+      <entry>At least one row per subscription, showing about transaction statistics and error summary that
       occurred on subscription.
       See <link linkend="monitoring-pg-stat-subscription-workers">
       <structname>pg_stat_subscription_workers</structname></link> for details.
@@ -3052,9 +3052,9 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
   <para>
    The <structname>pg_stat_subscription_workers</structname> view will contain
-   one row per subscription error reported by workers applying logical
-   replication changes and workers handling the initial data copy of the
-   subscribed tables.
+   one row per subscription for transaction statistics and summary of the last
+   error reported by workers applying logical replication changes and workers
+   handling the initial data copy of the subscribed tables.
   </para>
 
   <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
@@ -3102,20 +3102,78 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>relid</structfield> <type>oid</type>
+       <structfield>last_error_relid</structfield> <type>oid</type>
       </para>
       <para>
        OID of the relation that the worker was processing when the
-       error occurred
+       last error occurred
       </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>command</structfield> <type>text</type>
+       <structfield>xact_commit</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_commit_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data successfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_error</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions unsuccessfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_error_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data unsuccessfully applied in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_abort</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_abort_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of transactions data aborted in this subscription.
+       Increase <literal>logical_decoding_work_mem</literal> on the publisher
+       to suppress unnecessary consumed network bandwidth or change in memory
+       of the subscriber, if unexpected amount of rollbacked transactions are
+       streamed.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_error_command</structfield> <type>text</type>
       </para>
       <para>
-       Name of command being applied when the error occurred.  This field
+       Name of last command being applied when the error occurred.  This field
        is always NULL if the error was reported during the initial data
        copy.
       </para></entry>
@@ -3123,10 +3181,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>xid</structfield> <type>xid</type>
+       <structfield>last_error_xid</structfield> <type>xid</type>
       </para>
       <para>
-       Transaction ID of the publisher node being applied when the error
+       Transaction ID of the publisher node being applied when the last error
        occurred.  This field is always NULL if the error was reported
        during the initial data copy.
       </para></entry>
@@ -3134,19 +3192,19 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>count</structfield> <type>uint8</type>
+       <structfield>last_error_count</structfield> <type>uint8</type>
       </para>
       <para>
-       Number of consecutive times the error occurred
+       Number of consecutive times the last error occurred
       </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>error_message</structfield> <type>text</type>
+       <structfield>last_error_message</structfield> <type>text</type>
       </para>
       <para>
-       The error message
+       The last error message
       </para></entry>
      </row>
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 95226b8..5f00053 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1263,11 +1263,17 @@ CREATE VIEW pg_stat_subscription_workers AS
 	e.subid,
 	s.subname,
 	e.subrelid,
-	e.relid,
-	e.command,
-	e.xid,
-	e.count,
-	e.error_message,
+	e.xact_commit,
+	e.xact_commit_bytes,
+	e.xact_error,
+	e.xact_error_bytes,
+	e.xact_abort,
+	e.xact_abort_bytes,
+	e.last_error_relid,
+	e.last_error_command,
+	e.last_error_xid,
+	e.last_error_count,
+	e.last_error_message,
 	e.last_error_time,
 	e.stats_reset
     FROM (SELECT
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 5c723bc..20184c6 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -192,6 +192,16 @@ static void find_matching_subplans_recurse(PartitionPruningData *prunedata,
 										   bool initial_prune,
 										   Bitmapset **validsubplans);
 
+/*
+ * PartitionTupleRoutingSize - exported to calculate total data size
+ * of logical replication mesage apply, because this is one of the
+ * ApplyExecutionData struct members.
+ */
+size_t
+PartitionTupleRoutingSize(void)
+{
+	return sizeof(PartitionTupleRouting);
+}
 
 /*
  * ExecSetupPartitionTupleRouting - sets up information needed during
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index d9a2048..95c8ece 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -54,6 +54,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/backendid.h"
@@ -287,6 +288,13 @@ static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
 static HTAB *subWorkerStatHash = NULL;
 
+/* Stats of prepared transactions should be displayed
+ * at either commit prepared or rollback prepared time, even when it's
+ * after the server restart. We have the apply worker send those statistics
+ * to the stats collector at exit and the startup process restore those at restart.
+ */
+static HTAB *subWorkerPreparedXactSizeHash = NULL;
+
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
@@ -338,6 +346,9 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp
 
 static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(Oid subid, Oid subrelid,
 															 bool create);
+static PgStat_StatSubWorkerPreparedXactSize *pgstat_get_subworker_prepared_txn(Oid subid,
+																			   char *gid, bool create);
+
 static void pgstat_reset_subworker_entry(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts);
 static void pgstat_report_subscription_purge(PgStat_MsgSubscriptionPurge *msg);
 static void pgstat_report_subworker_purge(PgStat_MsgSubWorkerPurge *msg);
@@ -388,6 +399,9 @@ static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
 static void pgstat_recv_subworker_purge(PgStat_MsgSubWorkerPurge *msg, int len);
+static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len);
+static void pgstat_recv_subworker_twophase_xact(PgStat_MsgSubWorkerTwophaseXact *msg, int len);
+
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -2115,6 +2129,73 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
+ * pgstat_report_subworker_xact_end() -
+ *
+ *  Tell the collector that worker transaction has finished without problem.
+ * ----------
+ */
+void
+pgstat_report_subworker_xact_end(Oid subid, Oid subrel,
+								 LogicalRepMsgType command, PgStat_Counter xact_size)
+{
+	PgStat_MsgSubWorkerXactEnd msg;
+
+	Assert(command == 0 /* table sync worker */ ||
+		   command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_STREAM_ABORT ||
+		   command == LOGICAL_REP_MSG_STREAM_COMMIT);
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrel;
+	msg.m_command = command;
+	msg.m_xact_bytes = xact_size;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd));
+
+	reset_apply_error_context_xact_size();
+}
+
+/* ----------
+ * pgstat_report_subworker_twophase_xact() -
+ *
+ *  Tell the collector that worker transaction has done 2PC related operation.
+ * ----------
+ */
+void
+pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command,
+									  PgStat_Counter xact_size,
+									  LogicalRepPreparedTxnData *prepare_data,
+									  LogicalRepCommitPreparedTxnData *commit_data,
+									  LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+	PgStat_MsgSubWorkerTwophaseXact msg;
+
+	Assert(command == LOGICAL_REP_MSG_PREPARE ||
+		   command == LOGICAL_REP_MSG_STREAM_PREPARE ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+	/* setup the message */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT);
+	msg.m_subid = subid;
+	msg.m_command = command;
+
+	/* get the gid for this two phase operation */
+	if (command == LOGICAL_REP_MSG_PREPARE ||
+		command == LOGICAL_REP_MSG_STREAM_PREPARE)
+		memcpy(msg.m_gid, prepare_data->gid, GIDSIZE);
+	else if (command == LOGICAL_REP_MSG_COMMIT_PREPARED)
+		memcpy(msg.m_gid, commit_data->gid, GIDSIZE);
+	else /* rollback prepared */
+		memcpy(msg.m_gid, rollback_data->gid, GIDSIZE);
+
+	msg.m_xact_bytes = xact_size;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerTwophaseXact));
+
+	reset_apply_error_context_xact_size();
+}
+
+/* ----------
  * pgstat_report_subworker_error() -
  *
  *	Tell the collector about the subscription worker error.
@@ -2134,6 +2215,7 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR);
 	msg.m_subid = subid;
 	msg.m_subrelid = subrelid;
+	msg.m_xact_error_bytes = get_apply_error_context_xact_size();
 	msg.m_dbid = MyDatabaseId;
 	msg.m_relid = relid;
 	msg.m_command = command;
@@ -2142,6 +2224,8 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 	strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
 
 	pgstat_send(&msg, len);
+
+	reset_apply_error_context_xact_size();
 }
 
 /* ----------
@@ -3876,6 +3960,14 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subworker_purge(&msg.msg_subworkerpurge, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERXACTEND:
+					pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT:
+					pgstat_recv_subworker_twophase_xact(&msg.msg_subworkertwophasexact, len);
+					break;
+
 				default:
 					break;
 			}
@@ -4193,6 +4285,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	}
 
 	/*
+	 * Write subscription worker's prepared transaction struct
+	 */
+	if (subWorkerPreparedXactSizeHash)
+	{
+		PgStat_StatSubWorkerPreparedXactSize *prepared_size;
+
+		hash_seq_init(&hstat, subWorkerPreparedXactSizeHash);
+		while((prepared_size = (PgStat_StatSubWorkerPreparedXactSize *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('P', fpout);
+			rc = fwrite(prepared_size, sizeof(PgStat_StatSubWorkerPreparedXactSize), 1, fpout);
+			(void) rc; 			/* we'll check for error with ferror */
+		}
+	}
+
+	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
 	 * after each individual fputc or fwrite above.
@@ -4695,6 +4803,40 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					break;
 				}
 
+			case 'P':
+				{
+					PgStat_StatSubWorkerPreparedXactSize buff;
+					PgStat_StatSubWorkerPreparedXactSize *prepared_xact_size;
+
+					if (fread(&buff, 1, sizeof(PgStat_StatSubWorkerPreparedXactSize),
+							  fpin) != sizeof(PgStat_StatSubWorkerPreparedXactSize))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					if (subWorkerPreparedXactSizeHash == NULL)
+					{
+						HASHCTL		hash_ctl;
+
+						hash_ctl.keysize = sizeof(PgStat_StatSubWorkerPreparedXact);
+						hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerPreparedXactSize);
+						hash_ctl.hcxt = pgStatLocalContext;
+						subWorkerPreparedXactSizeHash = hash_create("Subscription worker stats of prepared txn",
+														PGSTAT_SUBWORKER_HASH_SIZE,
+														&hash_ctl,
+														HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+					}
+
+					prepared_xact_size =
+						(PgStat_StatSubWorkerPreparedXactSize *) hash_search(subWorkerPreparedXactSizeHash,
+																			 (void *) &buff.key,
+																			 HASH_ENTER, NULL);
+					memcpy(prepared_xact_size, &buff, sizeof(PgStat_StatSubWorkerPreparedXactSize));
+					break;
+				}
 			case 'E':
 				goto done;
 
@@ -5260,6 +5402,7 @@ pgstat_clear_snapshot(void)
 	pgStatDBHash = NULL;
 	replSlotStatHash = NULL;
 	subWorkerStatHash = NULL;
+	subWorkerPreparedXactSizeHash = NULL;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -6259,6 +6402,105 @@ pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subworker_xact_end() -
+ *
+ *	Process a SUBWORKERXACTEND message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+
+	wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, true);
+	Assert(wentry);
+
+	/* table sync worker */
+	if (msg->m_command == 0)
+		wentry->xact_commit++;
+	else
+	{
+		/* apply worker */
+		switch(msg->m_command)
+		{
+			case LOGICAL_REP_MSG_COMMIT:
+			case LOGICAL_REP_MSG_STREAM_COMMIT:
+				wentry->xact_commit++;
+				wentry->xact_commit_bytes += msg->m_xact_bytes;
+				break;
+			case LOGICAL_REP_MSG_STREAM_ABORT:
+				wentry->xact_abort++;
+				wentry->xact_abort_bytes += msg->m_xact_bytes;
+				break;
+			default:
+				elog(ERROR, "unexpected logical message type as normal apply end");
+				break;
+		}
+	}
+}
+
+/* ----------
+ * pgstat_recv_subworker_twophase_xact() -
+ *
+ *	Process a SUBWORKERTWOPHASEXACT message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_twophase_xact(PgStat_MsgSubWorkerTwophaseXact *msg, int len)
+{
+	PgStat_StatSubWorkerPreparedXactSize *prepared_txn;
+	PgStat_StatSubWorkerEntry *wentry;
+	PgStat_StatSubWorkerPreparedXact key;
+
+	prepared_txn = pgstat_get_subworker_prepared_txn(msg->m_subid, msg->m_gid, true);
+	Assert(prepared_txn);
+	switch(msg->m_command)
+	{
+		case LOGICAL_REP_MSG_PREPARE:
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			/*
+			 * Make each size of prepared transaction persistent
+			 * so that we can update stats over the server restart
+			 * and make prepared stats updated when commit prepared
+			 * or rollback prepared arrives.
+			 */
+			prepared_txn->subid = msg->m_subid;
+			strlcpy(prepared_txn->gid, msg->m_gid, strlen(msg->m_gid) + 1);
+			prepared_txn->xact_size = msg->m_xact_bytes;
+			break;
+
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			/* Update exported xact stats now */
+			wentry = pgstat_get_subworker_entry(msg->m_subid,
+												InvalidOid /* apply worker */,
+												true);
+			Assert(wentry);
+			if (msg->m_command == LOGICAL_REP_MSG_COMMIT_PREPARED)
+			{
+				wentry->xact_commit++;
+				wentry->xact_commit_bytes += prepared_txn->xact_size;
+			}
+			else
+			{
+				wentry->xact_abort++;
+				wentry->xact_abort_bytes += prepared_txn->xact_size;
+			}
+
+			/* Clean up this gid from transaction size hash */
+			key.subid = prepared_txn->subid;
+			memcpy(key.gid, msg->m_gid, strlen(msg->m_gid));
+			(void) hash_search(subWorkerPreparedXactSizeHash,
+							   (void *) &key, HASH_REMOVE, NULL);
+			break;
+
+		default:
+			elog(ERROR, "unexpected logical message type as prepare transaction");
+			break;
+	}
+}
+
+/* ----------
  * pgstat_recv_subworker_error() -
  *
  *	Process a SUBWORKERERROR message.
@@ -6273,6 +6515,10 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 	wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, true);
 	Assert(wentry);
 
+	/* general transaction stats for error */
+	wentry->xact_error++;
+	wentry->xact_error_bytes += msg->m_xact_error_bytes;
+
 	/*
 	 * Update only the counter and timestamp if we received the same error
 	 * again
@@ -6489,6 +6735,51 @@ pgstat_get_subworker_entry(Oid subid, Oid subrelid, bool create)
 }
 
 /* ----------
+ * pgstat_get_subworker_prepared_txn
+ *
+ * Return subscription worker entry with the given subscription OID and
+ * gid.
+ * ----------
+ */
+static PgStat_StatSubWorkerPreparedXactSize*
+pgstat_get_subworker_prepared_txn(Oid subid, char *gid, bool create)
+{
+	PgStat_StatSubWorkerPreparedXact key;
+	PgStat_StatSubWorkerPreparedXactSize *prepared_txn_size;
+	HASHACTION	action;
+	bool		found;
+
+	if (subWorkerPreparedXactSizeHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		hash_ctl.keysize = sizeof(PgStat_StatSubWorkerPreparedXact);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerPreparedXactSize);
+		hash_ctl.hcxt = pgStatLocalContext;
+		subWorkerPreparedXactSizeHash = hash_create("Subscription worker stats of prepared txn",
+													PGSTAT_SUBWORKER_HASH_SIZE,
+													&hash_ctl,
+													HASH_ELEM | HASH_STRINGS);
+	}
+
+	key.subid = subid;
+	memcpy(key.gid, gid, strlen(gid));
+	action = (create ? HASH_ENTER : HASH_FIND);
+	prepared_txn_size = (PgStat_StatSubWorkerPreparedXactSize *) hash_search(subWorkerPreparedXactSizeHash,
+																			 (void *) &key,
+																			 action, &found);
+
+	if (create && !found)
+	{
+		prepared_txn_size->subid = 0;
+		prepared_txn_size->gid[0] = '\0';
+		prepared_txn_size->xact_size = 0;
+	}
+
+	return prepared_txn_size;
+}
+
+/* ----------
  * pgstat_reset_subworker_entry
  *
  * Reset the given subscription worker statistics.
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a..aff78fd 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1149,6 +1149,12 @@ copy_table_done:
 	MyLogicalRepWorker->relstate_lsn = *origin_startpos;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	/* Report the success of table sync. */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 MyLogicalRepWorker->relid,
+									 0 /* no logical message type */,
+									 0 /* xact size */);
+
 	/*
 	 * Finally, wait until the main apply worker tells us to catch up and then
 	 * return to let LogicalRepApplyLoop do it.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d771a0c..6284310 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -222,6 +222,21 @@ typedef struct ApplyErrorCallbackArg
 	LogicalRepMsgType command;	/* 0 if invalid */
 	LogicalRepRelMapEntry *rel;
 
+	/*
+	 * Store data size of this transaction.
+	 *
+	 * The byte size of transaction on the publisher is calculated
+	 * by ReorderBufferChangeSize() based on the ReorderBufferChange
+	 * structure. But on the subscriber, consumed resources are
+	 * not same as the publisher's decoding processsing and required
+	 * to be computed in different way. Therefore, the exact same byte
+	 * size is not restored on the subscriber usually.
+	 *
+	 * Data size of streaming transactions is managed by streamingXactSize
+	 * for flexible data blocks handling.
+	 */
+	PgStat_Counter bytes;
+
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
@@ -232,11 +247,32 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 {
 	.command = 0,
 	.rel = NULL,
+	.bytes = 0,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
 	.ts = 0,
 };
 
+/*
+ * Two or more streaming transactions in parallel on the publisher
+ * generate unexpected order of partial txn data demarcated stream start
+ * and stream stop to the subscriber, since whenever its size of one of
+ * the txns reaches the publisher's logical_decoding_work_mem,
+ * the part (and in the end, last remaining changes) is streamed.
+ * This creates mixed blocks of streaming data while
+ * there's possibility some are successfully committed but others are
+ * not by stream abort. Therefore, to track correct byte size
+ * it's necessary to trace each streaming transaction by making paring
+ * of xid and transaction size.
+ */
+#define PARALLEL_STREAMING_XACTS 32
+typedef struct XactSizeEntry
+{
+	TransactionId	key;
+	PgStat_Counter	xact_size;
+} XactSizeEntry;
+static HTAB *streamingXactSize = NULL;
+
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -320,6 +356,8 @@ static void maybe_reread_subscription(void);
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
+void update_apply_change_size(LogicalRepMsgType action, void *data);
+
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
@@ -853,6 +891,11 @@ apply_handle_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 InvalidOid,
+									 LOGICAL_REP_MSG_COMMIT,
+									 get_apply_error_context_xact_size());
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -965,6 +1008,11 @@ apply_handle_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_PREPARE,
+										  get_apply_error_context_xact_size(),
+										  &prepare_data, NULL, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1006,6 +1054,13 @@ apply_handle_commit_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	update_apply_change_size(LOGICAL_REP_MSG_COMMIT_PREPARED, NULL);
+
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_COMMIT_PREPARED,
+										  get_apply_error_context_xact_size(),
+										  NULL, &prepare_data, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1057,6 +1112,12 @@ apply_handle_rollback_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
+	/* send rollback prepared message for this gid */
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_ROLLBACK_PREPARED,
+										  get_apply_error_context_xact_size(),
+										  NULL, NULL, &rollback_data);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1072,6 +1133,7 @@ static void
 apply_handle_stream_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1116,6 +1178,19 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	/*
+	 * Report the prepared streaming xact size to the stats collector
+	 * in a prepared xact manner to make it survive over the restart.
+	 */
+	streamed_entry = hash_search(streamingXactSize,
+								 (void *) &prepare_data.xid,
+								 HASH_FIND, NULL);
+	Assert(streamed_entry);
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_STREAM_PREPARE,
+										  streamed_entry->xact_size,
+										  &prepare_data, NULL, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1194,6 +1269,8 @@ apply_handle_stream_start(StringInfo s)
 		MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
 		FileSetInit(MyLogicalRepWorker->stream_fileset);
 
+		update_apply_change_size(LOGICAL_REP_MSG_STREAM_START, NULL);
+
 		MemoryContextSwitchTo(oldctx);
 	}
 
@@ -1244,12 +1321,17 @@ apply_handle_stream_stop(StringInfo s)
 
 /*
  * Handle STREAM abort message.
+ *
+ * Currently, abort of streaming subtransaction does not affect
+ * size of streaming transaction resources because it have used the
+ * resources anyway.
  */
 static void
 apply_handle_stream_abort(StringInfo s)
 {
 	TransactionId xid;
 	TransactionId subxid;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1265,8 +1347,36 @@ apply_handle_stream_abort(StringInfo s)
 	 */
 	if (xid == subxid)
 	{
+		bool		found = false;
 		set_apply_error_context_xact(xid, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+		/*
+		 * We've completed to handle stream abort without issue, so
+		 * get ready to report the transaction stats via normal
+		 * termination route instead of the apply error route.
+		 */
+		streamed_entry = hash_search(streamingXactSize,
+									 (void *) &xid,
+									 HASH_FIND, &found);
+		/*
+		 * It's possible that we get stream abort
+		 * earlier than any call of write_stream_change that
+		 * creates one hash entry for this xid. In this case,
+		 * to find a entry with this xid fails. So just check
+		 * if we've found it. Only when we confirm some writes
+		 * by write_stream_change, report the stream_abort.
+		 */
+		if (found)
+		{
+			pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+											 InvalidOid,
+											 LOGICAL_REP_MSG_STREAM_ABORT,
+											 streamed_entry->xact_size);
+			(void) hash_search(streamingXactSize,
+							   (void *) &xid,
+							   HASH_REMOVE, NULL);
+		}
 	}
 	else
 	{
@@ -1473,6 +1583,7 @@ apply_handle_stream_commit(StringInfo s)
 {
 	TransactionId xid;
 	LogicalRepCommitData commit_data;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1508,6 +1619,19 @@ apply_handle_stream_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	/* Report and clean up the xid */
+	streamed_entry = hash_search(streamingXactSize,
+								 (void *) &xid,
+								 HASH_FIND, NULL);
+	Assert(streamed_entry);
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 InvalidOid,
+									 LOGICAL_REP_MSG_STREAM_COMMIT,
+									 streamed_entry->xact_size);
+	(void) hash_search(streamingXactSize,
+					   (void *) &xid,
+					   HASH_REMOVE, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1646,6 +1770,8 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
+	update_apply_change_size(LOGICAL_REP_MSG_INSERT, rel);
+
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1803,6 +1929,8 @@ apply_handle_update(StringInfo s)
 					has_oldtup ? &oldtup : &newtup);
 	MemoryContextSwitchTo(oldctx);
 
+	update_apply_change_size(LOGICAL_REP_MSG_UPDATE, rel);
+
 	/* For a partitioned table, apply update to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1937,6 +2065,8 @@ apply_handle_delete(StringInfo s)
 	slot_store_data(remoteslot, rel, &oldtup);
 	MemoryContextSwitchTo(oldctx);
 
+	update_apply_change_size(LOGICAL_REP_MSG_DELETE, rel);
+
 	/* For a partitioned table, apply delete to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -2499,6 +2629,117 @@ apply_dispatch(StringInfo s)
 }
 
 /*
+ * Subscriber side implementation equivalent to ReorderBufferChangeSize
+ * of the publisher.
+ *
+ * According to the logical replication message type, record major
+ * resource consumptions of this subscription for each message.
+ * At present, do not collect data from generic functions to keep
+ * code simplicity, since the implementation complexity versus benefit
+ * tradeoff should not be good. Also, add multiple values
+ * at once in order to reduce the number of this function call.
+ *
+ * 'data' controls detail handling of data size calculation.
+ */
+void
+update_apply_change_size(LogicalRepMsgType action, void *data)
+{
+	int64       size = 0;
+	LogicalRepRelMapEntry *relmapentry;
+	LogicalRepRelation *reprelation;
+	int         *stream_write_len;
+
+	/*
+	 * In streaming mode, stream_write_change is called
+	 * instead of immediate apply. List up the messages types
+	 * that can be caught by handle_streamed_transaction and
+	 * treat the write length as the size of transaction so
+	 * that we can export it as part of pg_stat_subscription_worker.
+	 */
+	if (in_streamed_transaction &&
+		(action == LOGICAL_REP_MSG_INSERT ||
+		 action == LOGICAL_REP_MSG_UPDATE ||
+		 action == LOGICAL_REP_MSG_DELETE ||
+		 action == LOGICAL_REP_MSG_TRUNCATE ||
+		 action == LOGICAL_REP_MSG_RELATION ||
+		 action == LOGICAL_REP_MSG_TYPE))
+	{
+		stream_write_len = (int *) data;
+		size += *stream_write_len;
+		add_apply_error_context_xact_size(size);
+		return;
+	}
+
+	switch (action)
+	{
+		/* No special memory consumption */
+		case LOGICAL_REP_MSG_BEGIN:
+		case LOGICAL_REP_MSG_COMMIT:
+		case LOGICAL_REP_MSG_TRUNCATE:
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_ORIGIN:
+		case LOGICAL_REP_MSG_MESSAGE:
+		case LOGICAL_REP_MSG_STREAM_STOP:
+		case LOGICAL_REP_MSG_STREAM_ABORT:
+		case LOGICAL_REP_MSG_BEGIN_PREPARE:
+			break;
+
+		case LOGICAL_REP_MSG_INSERT:
+		case LOGICAL_REP_MSG_UPDATE:
+		case LOGICAL_REP_MSG_DELETE:
+			Assert(data != NULL);
+
+			/*
+			 * Compute size based on ApplyExecutionData.
+			 * The size of LogicalRepRelMapEntry can be skipped because
+			 * it is obtained from hash_search in logicalrep_rel_open.
+			 */
+			size += sizeof(ApplyExecutionData) + sizeof(EState) +
+				sizeof(ResultRelInfo) + sizeof(ResultRelInfo);
+
+			/*
+			 * Add some extra size if the target relation is partitioned.
+			 * PartitionTupleRouting isn't exported. Therefore, call the
+			 * function that returns its size instead.
+			 */
+			relmapentry = (LogicalRepRelMapEntry *) data;
+			if (relmapentry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+				size += sizeof(ModifyTableState) + PartitionTupleRoutingSize();
+			break;
+
+		case LOGICAL_REP_MSG_RELATION:
+			Assert(data != NULL);
+
+			reprelation = (LogicalRepRelation *) data;
+			/* See logicalrep_read_attrs for the last two */
+			size += sizeof(LogicalRepRelation) +
+				reprelation->natts * sizeof(char *) +
+				reprelation->natts * sizeof(Oid);
+			break;
+
+		case LOGICAL_REP_MSG_STREAM_START:
+			size += sizeof(FileSet);
+			break;
+
+		case LOGICAL_REP_MSG_PREPARE:
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			size += sizeof(FlushPosition);
+			break;
+
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid logical replication message type \"%c\"", action)));
+	}
+
+	/* update the total size of consumption */
+	add_apply_error_context_xact_size(size);
+}
+
+/*
  * Figure out which write/flush positions to report to the walsender process.
  *
  * We can't simply report back the last LSN the walsender sent us because the
@@ -3352,6 +3593,9 @@ static void
 stream_write_change(char action, StringInfo s)
 {
 	int			len;
+	int			total_len;
+	bool		found;
+	XactSizeEntry *streamed_entry;
 
 	Assert(in_streamed_transaction);
 	Assert(TransactionIdIsValid(stream_xid));
@@ -3370,6 +3614,16 @@ stream_write_change(char action, StringInfo s)
 	len = (s->len - s->cursor);
 
 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
+
+	/* update xact size by xid */
+	total_len = (s->len - s->cursor) * 2 + sizeof(char) + sizeof(action);
+	streamed_entry = (XactSizeEntry *) hash_search(streamingXactSize,
+												   (void *) &stream_xid,
+												   HASH_ENTER, &found);
+	if (!found)
+		streamed_entry->xact_size = total_len; /* init */
+	else
+		streamed_entry->xact_size = streamed_entry->xact_size + total_len; /* update */
 }
 
 /*
@@ -3507,6 +3761,23 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
+	/*
+	 * Initialize the apply worker's hash to manage bytes for
+	 * streaming txns.
+	 */
+	if (!am_tablesync_worker() && MySubscription->stream)
+	{
+		HASHCTL     hash_ctl;
+
+		hash_ctl.keysize = sizeof(TransactionId);
+		hash_ctl.entrysize = sizeof(XactSizeEntry);
+		hash_ctl.hcxt = LogicalStreamingContext;
+		streamingXactSize = hash_create("xact size per streaming xid",
+										PARALLEL_STREAMING_XACTS,
+										&hash_ctl,
+										HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	}
+
 	if (am_tablesync_worker())
 	{
 		char	   *syncslotname;
@@ -3747,6 +4018,27 @@ reset_apply_error_context_info(void)
 	set_apply_error_context_xact(InvalidTransactionId, 0);
 }
 
+/* Exported so that stats collector can utilize this value */
+int64
+get_apply_error_context_xact_size(void)
+{
+	return apply_error_callback_arg.bytes;
+}
+
+/* Add size to apply error bytes */
+void
+add_apply_error_context_xact_size(int64 size)
+{
+	apply_error_callback_arg.bytes += size;
+}
+
+/* Reset information of apply error callback */
+void
+reset_apply_error_context_xact_size(void)
+{
+	apply_error_callback_arg.bytes = 0;
+}
+
 /*
  * Start skipping changes of the transaction if the given XID matches the
  * transaction ID specified by skip_xid option.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index b58a61d..47eb6da 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2405,7 +2405,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 9
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 15
 	Oid			subid = PG_GETARG_OID(0);
 	Oid			subrelid;
 	TupleDesc	tupdesc;
@@ -2425,19 +2425,31 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "relid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "xact_commit",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "xact_commit_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xact_error",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "xact_error_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "xact_abort",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "xact_abort_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_relid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "command",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_xid",
 					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "last_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "error_message",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "last_error_message",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 14, "last_error_time",
 					   TIMESTAMPTZOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 15, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2462,28 +2474,36 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
-	/* relid */
+	/* transaction stats */
+	values[i++] = Int64GetDatum(wentry->xact_commit);
+	values[i++] = Int64GetDatum(wentry->xact_commit_bytes);
+	values[i++] = Int64GetDatum(wentry->xact_error);
+	values[i++] = Int64GetDatum(wentry->xact_error_bytes);
+	values[i++] = Int64GetDatum(wentry->xact_abort);
+	values[i++] = Int64GetDatum(wentry->xact_abort_bytes);
+
+	/* last_error_relid */
 	if (OidIsValid(wentry->relid))
 		values[i++] = ObjectIdGetDatum(wentry->relid);
 	else
 		nulls[i++] = true;
 
-	/* command */
+	/* last_error_command */
 	if (wentry->command != 0)
 		values[i++] = CStringGetTextDatum(logicalrep_message_type(wentry->command));
 	else
 		nulls[i++] = true;
 
-	/* xid */
+	/* last_error_xid */
 	if (TransactionIdIsValid(wentry->xid))
 		values[i++] = TransactionIdGetDatum(wentry->xid);
 	else
 		nulls[i++] = true;
 
-	/* count */
+	/* last_error_count */
 	values[i++] = Int64GetDatum(wentry->count);
 
-	/* error_message */
+	/* last_error_message */
 	values[i++] = CStringGetTextDatum(wentry->message);
 
 	/* last_error_time */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4543a00..98bf9c1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5385,13 +5385,13 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
-{ oid => '8523', descr => 'statistics: information about subscription error',
+{ oid => '8523', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,relid,command,xid,count,error_message,last_error_time,stats_reset}',
+  proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,int8,int8,int8,oid,text,xid,int8,text,timestamptz,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,xact_commit,xact_commit_bytes,xact_error,xact_error_bytes,xact_abort,xact_abort_bytes,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time,stats_reset}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index 694e38b..773e46c 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -110,6 +110,7 @@ typedef struct PartitionPruneState
 	PartitionPruningData *partprunedata[FLEXIBLE_ARRAY_MEMBER];
 } PartitionPruneState;
 
+extern size_t PartitionTupleRoutingSize(void);
 extern PartitionTupleRouting *ExecSetupPartitionTupleRouting(EState *estate,
 															 Relation rel);
 extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 59e3763..33bc169 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -88,6 +88,8 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_SUBWORKERERROR,
 	PGSTAT_MTYPE_SUBWORKERPURGE,
+	PGSTAT_MTYPE_SUBWORKERXACTEND,
+	PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT,
 } StatMsgType;
 
 /* ----------
@@ -591,6 +593,54 @@ typedef struct PgStat_MsgSubWorkerPurge
 } PgStat_MsgSubWorkerPurge;
 
 /* ----------
+ * PgStat_MsgSubscriptionXactEnd	Sent by the apply worker or the table sync worker
+ *									to report successful transaction ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerXactEnd
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid         m_subid;
+	Oid         m_subrelid;
+
+	/*
+	 * distinguish between transaction commits and streaming transaction aborts
+	 * that are handled without error.
+	 */
+	LogicalRepMsgType m_command;
+
+	/* memory consumption used by transaction */
+	PgStat_Counter m_xact_bytes;
+
+} PgStat_MsgSubWorkerXactEnd;
+
+/* ----------
+ * PgStat_MsgSubWorkerTwophaseXact	Sent by the apply worker to make size of prepared
+ *									txn persistent over the server restart and make it
+ *									visible after commit prepare or rollback prepared.
+ *									This is separated from PgStat_MsgSubWorkerXactEnd
+ *									so that we can reduce message size of gid for other
+ *									operations (e.g. normal COMMIT) that should happen more
+ *									frequently than prepare operation usually.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerTwophaseXact
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the subscription */
+	Oid         m_subid;
+
+	LogicalRepMsgType m_command;
+	char         m_gid[GIDSIZE];
+	int          gid_len;
+	PgStat_Counter m_xact_bytes;
+
+} PgStat_MsgSubWorkerTwophaseXact;
+
+/* ----------
  * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync worker to
  *								report the error occurred during logical replication.
  * ----------
@@ -609,6 +659,12 @@ typedef struct PgStat_MsgSubWorkerError
 	Oid			m_subrelid;
 
 	/*
+	 * Transaction stats of subscription needs to be updated when an
+	 * error occurs.
+	 */
+	PgStat_Counter m_xact_error_bytes;
+
+	/*
 	 * Oids of the database and the table that the reporter was actually
 	 * processing. m_relid can be InvalidOid if an error occurred during
 	 * worker applying a non-data-modification message such as RELATION.
@@ -803,6 +859,8 @@ typedef union PgStat_Msg
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 	PgStat_MsgSubWorkerError msg_subworkererror;
 	PgStat_MsgSubWorkerPurge msg_subworkerpurge;
+	PgStat_MsgSubWorkerXactEnd msg_subworkerxactend;
+	PgStat_MsgSubWorkerTwophaseXact msg_subworkertwophasexact;
 } PgStat_Msg;
 
 
@@ -1035,6 +1093,16 @@ typedef struct PgStat_StatSubWorkerEntry
 	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
 
 	/*
+	 * Transaction statistics of subscription worker
+	 */
+	PgStat_Counter xact_commit;
+	PgStat_Counter xact_commit_bytes;
+	PgStat_Counter xact_error;
+	PgStat_Counter xact_error_bytes;
+	PgStat_Counter xact_abort;
+	PgStat_Counter xact_abort_bytes;
+
+	/*
 	 * Subscription worker error statistics representing an error that
 	 * occurred during application of logical replication or the initial table
 	 * synchronization.
@@ -1049,6 +1117,22 @@ typedef struct PgStat_StatSubWorkerEntry
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubWorkerEntry;
 
+/* prepared transaction */
+typedef struct PgStat_StatSubWorkerPreparedXact
+{
+	Oid			subid;
+	char		gid[GIDSIZE];
+} PgStat_StatSubWorkerPreparedXact;
+
+typedef struct PgStat_StatSubWorkerPreparedXactSize
+{
+	PgStat_StatSubWorkerPreparedXact key; /* hash key */
+
+	Oid			subid;
+	char		gid[GIDSIZE];
+	PgStat_Counter xact_size;
+} PgStat_StatSubWorkerPreparedXactSize;
+
 /*
  * Working state needed to accumulate per-function-call timing statistics.
  */
@@ -1158,6 +1242,13 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_xact_end(Oid subid, Oid subrel,
+											 LogicalRepMsgType command, PgStat_Counter xact_size);
+extern void pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command,
+												  PgStat_Counter xact_size,
+												  LogicalRepPreparedTxnData *prepared_data,
+												  LogicalRepCommitPreparedTxnData *commit_data,
+												  LogicalRepRollbackPreparedTxnData *rollback_data);
 extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 										  LogicalRepMsgType command,
 										  TransactionId xid, const char *errmsg);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 2ad61a0..9a8447b 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,9 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+/* for transaction stats */
+extern int64 get_apply_error_context_xact_size(void);
+extern void add_apply_error_context_xact_size(int64 size);
+extern void reset_apply_error_context_xact_size(void);
+
 #endif							/* LOGICALWORKER_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index c6b83ce..eac859f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2097,11 +2097,17 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_workers| SELECT e.subid,
     s.subname,
     e.subrelid,
-    e.relid,
-    e.command,
-    e.xid,
-    e.count,
-    e.error_message,
+    e.xact_commit,
+    e.xact_commit_bytes,
+    e.xact_error,
+    e.xact_error_bytes,
+    e.xact_abort,
+    e.xact_abort_bytes,
+    e.last_error_relid,
+    e.last_error_command,
+    e.last_error_xid,
+    e.last_error_count,
+    e.last_error_message,
     e.last_error_time,
     e.stats_reset
    FROM ( SELECT pg_subscription.oid AS subid,
@@ -2112,7 +2118,7 @@ pg_stat_subscription_workers| SELECT e.subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel
           WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) e(subid, subrelid, relid, command, xid, count, error_message, last_error_time, stats_reset)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) e(subid, subrelid, xact_commit, xact_commit_bytes, xact_error, xact_error_bytes, xact_abort, xact_abort_bytes, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time, stats_reset)
      JOIN pg_subscription s ON ((e.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/test/subscription/t/025_error_report.pl b/src/test/subscription/t/025_error_report.pl
index 77d22ee..8ab403f 100644
--- a/src/test/subscription/t/025_error_report.pl
+++ b/src/test/subscription/t/025_error_report.pl
@@ -6,8 +6,8 @@
 
 use strict;
 use warnings;
-use PostgresNode;
-use TestLib;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
 use Test::More tests => 14;
 
 # Test if the error reported on pg_subscription_workers view is expected.
@@ -17,8 +17,8 @@ sub test_subscription_error
 
     my $check_sql = qq[
 SELECT count(1) > 0 FROM pg_stat_subscription_workers
-WHERE relid = '$relname'::regclass];
-    $check_sql .= " AND xid = '$xid'::xid;" if $xid ne '';
+WHERE last_error_relid = '$relname'::regclass];
+    $check_sql .= " AND last_error_xid = '$xid'::xid;" if $xid ne '';
 
     # Wait for the error statistics to be updated.
     $node->poll_query_until(
@@ -28,9 +28,9 @@ WHERE relid = '$relname'::regclass];
     my $result = $node->safe_psql(
 	'postgres',
 	qq[
-SELECT subname, command, relid::regclass, count > 0
+SELECT subname, last_error_command, last_error_relid::regclass, last_error_count > 0
 FROM pg_stat_subscription_workers
-WHERE relid = '$relname'::regclass;
+WHERE last_error_relid = '$relname'::regclass;
 ]);
     is($result, $expected_error, $msg);
 }
@@ -46,7 +46,7 @@ sub test_skip_subscription_error
     # Get XID of the failed transaction.
     my $skipxid = $node->safe_psql(
 	'postgres',
-	"SELECT xid FROM pg_stat_subscription_workers WHERE relid = '$relname'::regclass");
+	"SELECT last_error_xid FROM pg_stat_subscription_workers WHERE last_error_relid = '$relname'::regclass");
     is($skipxid, $xid, "remote xid and skip_xid are equal");
 
     $node->safe_psql('postgres',
@@ -65,7 +65,7 @@ WHERE subname = '$subname'
 }
 
 # Create publisher node.
-my $node_publisher = PostgresNode->new('publisher');
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->append_conf('postgresql.conf',
 			     qq[
@@ -75,7 +75,7 @@ logical_decoding_work_mem = 64kB
 $node_publisher->start;
 
 # Create subscriber node.
-my $node_subscriber = PostgresNode->new('subscriber');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 
 # The subscriber will enter an infinite error loop, so we don't want
diff --git a/src/test/subscription/t/026_worker_xact_stats.pl b/src/test/subscription/t/026_worker_xact_stats.pl
new file mode 100644
index 0000000..01b7e8a
--- /dev/null
+++ b/src/test/subscription/t/026_worker_xact_stats.pl
@@ -0,0 +1,133 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for subscription worker statistics during apply.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 3;
+
+# Create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+max_wal_senders = 10
+wal_sender_timeout = 0
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', qq[
+max_prepared_transactions = 10
+]);
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on, two_phase = on, copy_data = off);"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# COMMIT;
+$node_publisher->safe_psql(
+    'postgres',
+    "BEGIN; INSERT INTO test_tab VALUES (1); COMMIT;");
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_commit = 1;")
+  or die "didn't get updates of xact stats by commit";
+
+# Now, stats collector make the bytes updated also.
+my $result = $node_subscriber->safe_psql('postgres',
+"SELECT xact_commit_bytes > 0 FROM pg_stat_subscription_workers;");
+is($result, 't', 'got consumed bytes');
+
+# STREAM COMMIT
+$node_publisher->safe_psql(
+    'postgres',
+    "BEGIN; INSERT INTO test_tab VALUES(generate_series(1001, 2000)); COMMIT;");
+$result = $node_publisher->safe_psql('postgres',
+"SELECT stream_count > 0 FROM pg_stat_replication_slots;");
+is($result, 't', "confirm the streaming has happened.");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_commit = 2;")
+  or die "didn't get updates of xact stats by stream commit";
+
+# STREAM PREPARE & COMMIT PREPARED that restores the xact size before the shutdown.
+$node_publisher->safe_psql(
+    'postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES(generate_series(2001, 3000));
+PREPARE TRANSACTION 'gid1';
+]);
+
+# fetch the last value of xact_commit_bytes to check if
+# this column is increased after the server restart and commit prepared.
+my $tmp = $node_subscriber->safe_psql('postgres',
+"SELECT xact_commit_bytes FROM pg_stat_subscription_workers;");
+$node_subscriber->restart;
+$result = $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid1'");
+
+# stream prepare didn't increment xact_commit but commit prepared does.
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_commit = 3;")
+  or die "didn't get updates of xact stats by stream prepare and commit prepared";
+
+$result = $node_subscriber->safe_psql('postgres',
+"SELECT xact_commit_bytes > $tmp FROM pg_stat_subscription_workers;");
+is($result, 't', "after commit prepared, the xact size is restored");
+
+# STREAM ABORT
+$node_publisher->safe_psql('postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES(generate_series(3001, 4000));
+ROLLBACK;
+]);
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_abort = 1;")
+  or die "didn't get updates of xact stats by stream abort";
+
+# ROLLBACK PREPARED
+$node_publisher->safe_psql('postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES (2);
+PREPARE TRANSACTION 'gid2';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid2'");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_abort = 2;")
+  or die "didn't get updates of xact stats by rollback prepared";
+
+# error stats (by duplication error)
+$node_publisher->safe_psql('postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES (1); -- already inserted this value
+COMMIT;
+]);
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_error > 0;")
+  or die "didn't get updates of xact stats by error";
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
