From a28de2f6a92cf4b72246edbc57484873652ae286 Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Thu, 16 Dec 2021 11:11:55 +0000
Subject: [PATCH v17 2/2] Extend pg_stat_subscription_workers to include
 general transaction statistics

Categorize transactions of logical replication subscriber
into three types (commit, abort, error) and introduce
cumulative columns of those numbers in the pg_stat_subscription_workers.

In order to avoid having a large number of entries to be created
by the table synchronization, the new stats columns are utilized
only by the apply worker.

Author: Takamichi Osumi
Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow, Vignesh C, Ajin Cherian
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml                |  42 ++++++++++-
 src/backend/catalog/system_views.sql        |   3 +
 src/backend/postmaster/pgstat.c             |  74 +++++++++++++++++++
 src/backend/replication/logical/worker.c    |  16 ++++
 src/backend/utils/adt/pgstatfuncs.c         |  25 +++++--
 src/include/catalog/pg_proc.dat             |   6 +-
 src/include/pgstat.h                        |  28 +++++++
 src/test/regress/expected/rules.out         |   5 +-
 src/test/subscription/t/026_worker_stats.pl | 109 +++++++++++++++++++++++++++-
 src/tools/pgindent/typedefs.list            |   1 +
 10 files changed, 290 insertions(+), 19 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 62f2a33..e795a1d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -629,8 +629,8 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>One row per subscription worker, showing statistics about errors
-      that occurred on that subscription worker.
+      <entry>One row per subscription worker, showing statistics about transactions
+      and errors that occurred on that subscription worker.
       See <link linkend="monitoring-pg-stat-subscription-workers">
       <structname>pg_stat_subscription_workers</structname></link> for details.
       </entry>
@@ -3074,8 +3074,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
    The <structname>pg_stat_subscription_workers</structname> view will contain
    one row per subscription worker on which errors have occurred, for workers
    applying logical replication changes and workers handling the initial data
-   copy of the subscribed tables.  The statistics entry is removed when the
-   corresponding subscription is dropped.
+   copy of the subscribed tables. The row corresponding to the apply
+   worker shows transaction statistics of the main apply worker on the
+   subscription. The statistics entry is removed when the corresponding
+   subscription is dropped.
   </para>
 
   <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
@@ -3123,6 +3125,38 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       Both COMMIT and COMMIT PREPARED increment this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>abort_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription.
+       ROLLBACK PREPARED increments this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions that failed to be applied by the table
+       sync worker or main apply worker in this subscription. This
+       counter is updated after confirming the error is not same as
+       the previous one.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>last_error_relid</structfield> <type>oid</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 61b515c..dda3cd7 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1267,6 +1267,9 @@ CREATE VIEW pg_stat_subscription_workers AS
         w.subid,
         s.subname,
         w.subrelid,
+        w.commit_count,
+        w.abort_count,
+        w.error_count,
         w.last_error_relid,
         w.last_error_command,
         w.last_error_xid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 7264d2c..bac41ca 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -382,6 +382,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1949,6 +1950,32 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
+ * pgstat_report_subworker_xact_end() -
+ *
+ *  Tell the collector that worker transaction has successfully completed.
+ *  This should be called before the call of process_syning_tables() not to
+ *  miss an increment of transaction stats in case it leads to a process exit.
+ *  See process_syncing_tables_for_apply().
+ * ----------
+ */
+void
+pgstat_report_subworker_xact_end(Oid subid, LogicalRepMsgType command)
+{
+	PgStat_MsgSubWorkerXactEnd msg;
+
+	Assert(command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_STREAM_COMMIT ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = subid;
+	msg.m_command = command;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd));
+}
+
+/* ----------
  * pgstat_report_subworker_error() -
  *
  *	Tell the collector about the subscription worker error.
@@ -3746,6 +3773,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERXACTEND:
+					pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3965,6 +3996,9 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
 	/* If not found, initialize the new one */
 	if (!found)
 	{
+		subwentry->commit_count = 0;
+		subwentry->abort_count = 0;
+		subwentry->error_count = 0;
 		subwentry->last_error_relid = InvalidOid;
 		subwentry->last_error_command = 0;
 		subwentry->last_error_xid = InvalidTransactionId;
@@ -6153,6 +6187,39 @@ pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subworker_xact_end() -
+ *
+ *	Process a SUBWORKERXACTEND message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len)
+{
+	PgStat_StatDBEntry *dbentry;
+	PgStat_StatSubWorkerEntry *wentry;
+
+	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+	wentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
+										InvalidOid, true);
+	Assert(wentry);
+
+	switch (msg->m_command)
+	{
+		case LOGICAL_REP_MSG_COMMIT:
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+			wentry->commit_count++;
+			break;
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			wentry->abort_count++;
+			break;
+		default:
+			elog(ERROR, "unexpected logical message type as transaction end");
+			break;
+	}
+}
+
+/* ----------
  * pgstat_recv_subworker_error() -
  *
  *	Process a SUBWORKERERROR message.
@@ -6193,6 +6260,13 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 	subwentry->last_error_time = msg->m_timestamp;
 	strlcpy(subwentry->last_error_message, msg->m_message,
 			PGSTAT_SUBWORKERERROR_MSGLEN);
+
+	/*
+	 * Only if this is a new error reported by the apply worker, increment the
+	 * counter of error.
+	 */
+	if (!OidIsValid(msg->m_subrelid))
+		subwentry->error_count++;
 }
 
 /* ----------
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2e79302..45e7bf4 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -815,6 +815,10 @@ apply_handle_commit(StringInfo s)
 
 	apply_handle_commit_internal(&commit_data);
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 LOGICAL_REP_MSG_COMMIT);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
@@ -960,6 +964,10 @@ apply_handle_commit_prepared(StringInfo s)
 	store_flush_position(prepare_data.end_lsn);
 	in_remote_transaction = false;
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 LOGICAL_REP_MSG_COMMIT_PREPARED);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
@@ -1011,6 +1019,10 @@ apply_handle_rollback_prepared(StringInfo s)
 	store_flush_position(rollback_data.rollback_end_lsn);
 	in_remote_transaction = false;
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
@@ -1435,6 +1447,10 @@ apply_handle_stream_commit(StringInfo s)
 	/* unlink the files with serialized changes and subxact info */
 	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 LOGICAL_REP_MSG_STREAM_COMMIT);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index f529c15..87b5254 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2415,7 +2415,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	8
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	11
 	Oid			subid = PG_GETARG_OID(0);
 	Oid			subrelid;
 	TupleDesc	tupdesc;
@@ -2442,17 +2442,23 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "abort_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_relid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_xid",
 					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_message",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_time",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2470,6 +2476,11 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
+	/* transaction stats */
+	values[i++] = Int64GetDatum(wentry->commit_count);
+	values[i++] = Int64GetDatum(wentry->abort_count);
+	values[i++] = Int64GetDatum(wentry->error_count);
+
 	/* last_error_relid */
 	if (OidIsValid(wentry->last_error_relid))
 		values[i++] = ObjectIdGetDatum(wentry->last_error_relid);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4d992dc..b76f84a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5375,9 +5375,9 @@
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
+  proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,oid,text,xid,int8,text,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,commit_count,abort_count,error_count,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5b51b58..dd698ab 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -86,6 +86,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_DISCONNECT,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_SUBWORKERERROR,
+	PGSTAT_MTYPE_SUBWORKERXACTEND
 } StatMsgType;
 
 /* ----------
@@ -558,6 +559,24 @@ typedef struct PgStat_MsgSubscriptionPurge
 } PgStat_MsgSubscriptionPurge;
 
 /* ----------
+ * PgStat_MsgSubscriptionXactEnd	Sent by the apply worker to report transaction
+ *									ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerXactEnd
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid			m_databaseid;
+	Oid			m_subid;
+
+	/* necessary to determine column to increment */
+	LogicalRepMsgType m_command;
+
+} PgStat_MsgSubWorkerXactEnd;
+
+/* ----------
  * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync
  *								worker to report the error occurred while
  *								processing changes.
@@ -769,6 +788,7 @@ typedef union PgStat_Msg
 	PgStat_MsgDisconnect msg_disconnect;
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 	PgStat_MsgSubWorkerError msg_subworkererror;
+	PgStat_MsgSubWorkerXactEnd msg_subworkerxactend;
 } PgStat_Msg;
 
 
@@ -1010,6 +1030,13 @@ typedef struct PgStat_StatSubWorkerEntry
 	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
 
 	/*
+	 * Cumulative transaction statistics of subscription worker
+	 */
+	PgStat_Counter commit_count;
+	PgStat_Counter abort_count;
+	PgStat_Counter error_count;
+
+	/*
 	 * Subscription worker error statistics representing an error that
 	 * occurred during application of changes or the initial table
 	 * synchronization.
@@ -1131,6 +1158,7 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_xact_end(Oid subid, LogicalRepMsgType command);
 extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 										  LogicalRepMsgType command,
 										  TransactionId xid, const char *errmsg);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index b58b062..2b0bfae 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2097,6 +2097,9 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_workers| SELECT w.subid,
     s.subname,
     w.subrelid,
+    w.commit_count,
+    w.abort_count,
+    w.error_count,
     w.last_error_relid,
     w.last_error_command,
     w.last_error_xid,
@@ -2110,7 +2113,7 @@ pg_stat_subscription_workers| SELECT w.subid,
          SELECT pg_subscription_rel.srsubid AS subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, commit_count, abort_count, error_count, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
      JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl
index 8005c54..99253f4 100644
--- a/src/test/subscription/t/026_worker_stats.pl
+++ b/src/test/subscription/t/026_worker_stats.pl
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021, PostgreSQL Global Development Group
 
-# Tests for subscription error stats.
+# Tests for subscription stats.
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
@@ -44,9 +44,32 @@ WHERE last_error_relid = '$relname'::regclass
 	  or die "Timed out while waiting for " . $msg;
 }
 
+# Test whether the update of general transaction stats satisfies the expected
+# condition or not.
+sub confirm_transaction_stats_update
+{
+	my ($node, $condition, $msg) = @_;
+
+	# Check only the stats of the apply worker
+	my $sql = qq[
+SELECT count(1) = 1
+FROM pg_stat_subscription_workers
+WHERE subrelid IS NULL AND $condition];
+
+	$node->poll_query_until('postgres', $sql)
+	  or die "Timed out while waiting for " . $msg;
+}
+
 # Create publisher node.
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+	'postgresql.conf', qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+max_wal_senders = 10
+wal_sender_timeout = 0
+]);
 $node_publisher->start;
 
 # Create subscriber node.
@@ -58,6 +81,7 @@ $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->append_conf(
 	'postgresql.conf',
 	qq[
+max_prepared_transactions = 10
 wal_retrieve_retry_interval = 2s
 ]);
 $node_subscriber->start;
@@ -98,7 +122,7 @@ is($result, qq(0), 'check no subscription error');
 # Create subscription. The table sync for test_tab2 on tap_sub will enter into
 # infinite error loop due to violating the unique constraint.
 $node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;"
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on, two_phase = on);"
 );
 
 $node_publisher->wait_for_catchup('tap_sub');
@@ -132,6 +156,11 @@ test_subscription_error(
 	qq(duplicate key value violates unique constraint),
 	'error reported by the apply worker');
 
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'error_count = 1',
+	'the error_count increment by the apply worker');
+
 # Check the table sync worker's error in the view.
 test_subscription_error(
 	$node_subscriber, 'test_tab2', '', '',
@@ -150,11 +179,83 @@ $node_subscriber->poll_query_until('postgres',
 $node_subscriber->poll_query_until('postgres',
 	"SELECT count(1) > 0 FROM test_tab2");
 
-# There shouldn't be any errors in the view after dropping the subscription.
+# Check updation of subscription worker transaction count statistics.
+# COMMIT of an insertion of single record to test_tab1
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 1',
+	'the commit_count increment by the apply worker');
+
+# Some more tests for transaction stats
+# PREPARE & COMMIT PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (2);
+PREPARE TRANSACTION 'gid1'
+]);
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid1'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 2',
+	'the commit_count increment by commit prepared');
+
+# STREAM COMMIT
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES(generate_series(1001, 2000));
+COMMIT;
+]);
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 3',
+	'the commit_count increment by stream commit');
+
+# STREAM PREPARE & COMMIT PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES(generate_series(2001, 3000));
+PREPARE TRANSACTION 'gid2'
+]);
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid2'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 4',
+	'the commit_count increment by streamed commit prepared');
+
+# ROLLBACK PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (3);
+PREPARE TRANSACTION 'gid3';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid3'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'abort_count = 1',
+	'the abort_count increment by rollback prepared');
+
+# STREAM PREPARE & ROLLBACK PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (generate_series(3001, 4000));
+PREPARE TRANSACTION 'gid4';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid4'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'abort_count = 2',
+	'the abort_count increment by streamed rollback prepared');
+
+# There shouldn't be any records in the view after dropping the subscription.
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub;");
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(1) FROM pg_stat_subscription_workers");
-is($result, q(0), 'no error after dropping subscription');
+is($result, q(0), 'no record after dropping subscription');
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 0c61ccb..238e9fa 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1947,6 +1947,7 @@ PgStat_MsgResetslrucounter
 PgStat_MsgSLRU
 PgStat_MsgSubscriptionPurge
 PgStat_MsgSubWorkerError
+PgStat_MsgSubWorkerXactEnd
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
-- 
1.8.3.1

