From 3086779a6a89ba732853e35c43514659fb60f0c2 Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Wed, 2 Mar 2022 00:48:53 +0000
Subject: [PATCH v26] Extend pg_stat_subscription_stats to include general
 transaction statistics

Introduce 2 new subscription statistics columns (apply_commit_count
and apply_rollback_count) to the pg_stat_subscription_stats view
for counting cumulative transaction commits/rollbacks.

The timing when the data of transaction statistics is sent to the
stats collector is adjusted with PGSTAT_STAT_INTERVAL to avoid overload.

Author: Takamichi Osumi
Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow,
             Vignesh C, Ajin Cherian, Kyotaro Horiguchi, Tang Haiying,
             Peter Smith
Tested-by: Wang wei
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml               | 22 ++++++++
 src/backend/catalog/system_views.sql       |  2 +
 src/backend/postmaster/pgstat.c            | 81 ++++++++++++++++++++++++++++++
 src/backend/replication/logical/launcher.c |  2 +
 src/backend/replication/logical/worker.c   | 45 +++++++++++++++++
 src/backend/utils/adt/pgstatfuncs.c        | 18 +++++--
 src/include/catalog/pg_proc.dat            |  6 +--
 src/include/pgstat.h                       | 23 +++++++++
 src/include/replication/worker_internal.h  |  9 ++++
 src/test/regress/expected/rules.out        |  4 +-
 src/tools/pgindent/typedefs.list           |  2 +
 11 files changed, 206 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 9fb62fe..d1eff70 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -3128,6 +3128,28 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>apply_commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       Both <command>COMMIT</command> and <command>COMMIT PREPARED</command>
+       increment this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>apply_rollback_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions rollbacks in this subscription. Both
+       <command>ROLLBACK</command> and <command>ROLLBACK PREPARED</command>
+       increment this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 40b7bca..eae957f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1270,6 +1270,8 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.apply_commit_count,
+        ss.apply_rollback_count,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 53ddd93..e6d365d 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -55,6 +55,7 @@
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
+#include "replication/worker_internal.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
 #include "storage/fd.h"
@@ -286,6 +287,8 @@ static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
 static HTAB *subscriptionStatHash = NULL;
 
+extern LogicalRepSubscriptionStats subStats;
+
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
@@ -382,6 +385,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len);
 static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len);
+static void pgstat_recv_subscription_xact(PgStat_MsgSubscriptionXact *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1962,6 +1966,58 @@ pgstat_report_subscription_drop(Oid subid)
 }
 
 /* ----------
+ * pgstat_report_subscription_xact() -
+ *
+ *	Tell the collector about subscriptions transaction stats.
+ *	The statistics are cleared upon sending.
+ *
+ *	Setting 'force' to true makes sure that no stats data
+ *	related to subscription commit/rollback is lost before the
+ *	logical worker exit.
+ * ----------
+ */
+void
+pgstat_report_subscription_xact(bool force)
+{
+	static TimestampTz last_report = 0;
+	PgStat_MsgSubscriptionXact msg;
+
+	/* Bailout early if nothing to do */
+	if (!OidIsValid(subStats.subid) ||
+		(subStats.apply_commit_count == 0 && subStats.apply_rollback_count == 0))
+		return;
+
+	if (!force)
+	{
+		TimestampTz now = GetCurrentTimestamp();
+
+		/*
+		 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
+		 * msec since we last sent one. This is to avoid overloading the stats
+		 * collector.
+		 */
+		if (!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
+			return;
+		last_report = now;
+	}
+
+	/*
+	 * Prepare and send the message.
+	 */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONXACT);
+	msg.m_subid = subStats.subid;
+	msg.apply_commit_count = subStats.apply_commit_count;
+	msg.apply_rollback_count = subStats.apply_rollback_count;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionXact));
+
+	/*
+	 * Clear out the statistics.
+	 */
+	subStats.apply_commit_count = 0;
+	subStats.apply_rollback_count = 0;
+}
+
+/* ----------
  * pgstat_ping() -
  *
  *	Send some junk data to the collector to increase traffic.
@@ -3687,6 +3743,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBSCRIPTIONXACT:
+					pgstat_recv_subscription_xact(&msg.msg_subscriptionxact, len);
+					break;
+
 				default:
 					break;
 			}
@@ -6092,6 +6152,25 @@ pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subscription_xact() -
+ *
+ *	Process a SUBSCRIPTIONXACT message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_xact(PgStat_MsgSubscriptionXact *msg, int len)
+{
+	PgStat_StatSubEntry *subentry;
+
+	/* Get the subscription stats */
+	subentry = pgstat_get_subscription_entry(msg->m_subid, true);
+	Assert(subentry);
+
+	subentry->apply_commit_count += msg->apply_commit_count;
+	subentry->apply_rollback_count += msg->apply_rollback_count;
+}
+
+/* ----------
  * pgstat_write_statsfile_needed() -
  *
  *	Do we need to write out any stats files?
@@ -6268,6 +6347,8 @@ pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts)
 {
 	subentry->apply_error_count = 0;
 	subentry->sync_error_count = 0;
+	subentry->apply_commit_count = 0;
+	subentry->apply_rollback_count = 0;
 	subentry->stat_reset_timestamp = ts;
 }
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 5a68d6d..4dfcac8 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -647,6 +647,8 @@ logicalrep_worker_onexit(int code, Datum arg)
 	if (LogRepWorkerWalRcvConn)
 		walrcv_disconnect(LogRepWorkerWalRcvConn);
 
+	pgstat_report_subscription_xact(true);
+
 	logicalrep_worker_detach();
 
 	/* Cleanup fileset used for streaming transactions. */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f7..95ec2eb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -238,6 +238,13 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 	.ts = 0,
 };
 
+LogicalRepSubscriptionStats subStats =
+{
+	.subid = InvalidOid,
+	.apply_commit_count = 0,
+	.apply_rollback_count = 0,
+};
+
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -329,6 +336,9 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 /* Compute GID for two_phase transactions */
 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
 
+static void subscription_stats_incr_commit(void);
+static void subscription_stats_incr_rollback(void);
+
 /* Common streaming function to apply all the spooled messages */
 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
@@ -959,6 +969,8 @@ apply_handle_commit_prepared(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
+	subscription_stats_incr_commit();
+
 	store_flush_position(prepare_data.end_lsn);
 	in_remote_transaction = false;
 
@@ -1006,6 +1018,8 @@ apply_handle_rollback_prepared(StringInfo s)
 		FinishPreparedTransaction(gid, false);
 		end_replication_step();
 		CommitTransactionCommand();
+
+		subscription_stats_incr_rollback();
 	}
 
 	pgstat_report_stat(false);
@@ -1217,6 +1231,8 @@ apply_handle_stream_abort(StringInfo s)
 	{
 		set_apply_error_context_xact(xid, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+		subscription_stats_incr_rollback();
 	}
 	else
 	{
@@ -1463,6 +1479,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 		CommitTransactionCommand();
 		pgstat_report_stat(false);
 
+		subscription_stats_incr_commit();
+
 		store_flush_position(commit_data->end_lsn);
 	}
 	else
@@ -2717,6 +2735,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (endofstream)
 			break;
 
+		pgstat_report_subscription_xact(false);
+
 		/*
 		 * Wait for more data or latch.  If we have unflushed transactions,
 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
@@ -3372,6 +3392,28 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
 	snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
 }
 
+/*
+ * Increment the counter of commit for subscription statistics.
+ */
+static void
+subscription_stats_incr_commit(void)
+{
+	Assert(OidIsValid(subStats.subid));
+
+	subStats.apply_commit_count++;
+}
+
+/*
+ * Increment the counter of rollback for subscription statistics.
+ */
+static void
+subscription_stats_incr_rollback(void)
+{
+	Assert(OidIsValid(subStats.subid));
+
+	subStats.apply_rollback_count++;
+}
+
 /* Logical Replication Apply worker entry point */
 void
 ApplyWorkerMain(Datum main_arg)
@@ -3469,6 +3511,9 @@ ApplyWorkerMain(Datum main_arg)
 
 	CommitTransactionCommand();
 
+	/* Set the subid for subscription statistics */
+	subStats.subid = MyLogicalRepWorker->subid;
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index fd993d0..c3d4be7 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2405,7 +2405,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	6
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS];
@@ -2424,7 +2424,11 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "apply_commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "apply_rollback_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2448,11 +2452,17 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	/* sync_error_count */
 	values[2] = Int64GetDatum(subentry->sync_error_count);
 
+	/* apply_commit_count */
+	values[3] = Int64GetDatum(subentry->apply_commit_count);
+
+	/* apply_rollback_count */
+	values[4] = Int64GetDatum(subentry->apply_rollback_count);
+
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[5] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[5] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bf88858..5a18b9a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5380,9 +5380,9 @@
   proname => 'pg_stat_get_subscription_stats', proisstrict => 'f',
   provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,apply_commit_count,apply_rollback_count,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index be2f7e2..7588d6a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -87,6 +87,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_DISCONNECT,
 	PGSTAT_MTYPE_SUBSCRIPTIONDROP,
 	PGSTAT_MTYPE_SUBSCRIPTIONERROR,
+	PGSTAT_MTYPE_SUBSCRIPTIONXACT
 } StatMsgType;
 
 /* ----------
@@ -577,6 +578,23 @@ typedef struct PgStat_MsgSubscriptionError
 	bool		m_is_apply_error;
 } PgStat_MsgSubscriptionError;
 
+
+/* ----------
+ * PgStat_MsgSubscriptionXact          Sent by the subscription worker to report transaction
+ *                                                                     ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubscriptionXact
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the subscription entry */
+	Oid			m_subid;
+
+	PgStat_Counter apply_commit_count;
+	PgStat_Counter apply_rollback_count;
+} PgStat_MsgSubscriptionXact;
+
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
  * ----------
@@ -757,6 +775,7 @@ typedef union PgStat_Msg
 	PgStat_MsgDisconnect msg_disconnect;
 	PgStat_MsgSubscriptionError msg_subscriptionerror;
 	PgStat_MsgSubscriptionDrop msg_subscriptiondrop;
+	PgStat_MsgSubscriptionXact msg_subscriptionxact;
 } PgStat_Msg;
 
 
@@ -981,6 +1000,9 @@ typedef struct PgStat_StatSubEntry
 
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter apply_commit_count;
+	PgStat_Counter apply_rollback_count;
+
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -1177,6 +1199,7 @@ extern void pgstat_send_archiver(const char *xlog, bool failed);
 extern void pgstat_send_bgwriter(void);
 extern void pgstat_send_checkpointer(void);
 extern void pgstat_send_wal(bool force);
+extern void pgstat_report_subscription_xact(bool force);
 
 /* ----------
  * Support functions for the SQL-callable functions to
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 3c3f5f6..c5d01fa 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -68,6 +68,15 @@ typedef struct LogicalRepWorker
 	TimestampTz reply_time;
 } LogicalRepWorker;
 
+
+typedef struct LogicalRepSubscriptionStats
+{
+	Oid			subid;
+
+	int64		apply_commit_count;
+	int64		apply_rollback_count;
+} LogicalRepSubscriptionStats;
+
 /* Main memory context for apply worker. Permanent during worker lifetime. */
 extern MemoryContext ApplyContext;
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index ac46856..a7ef303 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2076,9 +2076,11 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.apply_commit_count,
+    ss.apply_rollback_count,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, apply_commit_count, apply_rollback_count, stats_reset);
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
     pg_stat_all_indexes.schemaname,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d9b83f7..c5f7aec 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1401,6 +1401,7 @@ LogicalRepRelId
 LogicalRepRelMapEntry
 LogicalRepRelation
 LogicalRepRollbackPreparedTxnData
+LogicalRepSubscriptionStats
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
@@ -1947,6 +1948,7 @@ PgStat_MsgResetsubcounter
 PgStat_MsgSLRU
 PgStat_MsgSubscriptionDrop
 PgStat_MsgSubscriptionError
+PgStat_MsgSubscriptionXact
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
-- 
1.8.3.1

