From e5191f045484d5cd27578868598dec94fcbc06db Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Sat, 16 Nov 2019 18:24:00 +0530
Subject: [PATCH 2/2] Track statistics for spilling of changes from
 ReorderBuffer.

This adds the statistics about transactions spilled to disk from
ReorderBuffer.  Users can query the pg_stat_replication view to check
these stats.

Author: Tomas Vondra, with bug-fixes and minor changes by Dilip Kumar
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
---
 doc/src/sgml/monitoring.sgml                    | 20 ++++++++++++
 src/backend/catalog/system_views.sql            |  5 ++-
 src/backend/replication/logical/reorderbuffer.c | 12 +++++++
 src/backend/replication/walsender.c             | 42 +++++++++++++++++++++++--
 src/include/catalog/pg_proc.dat                 |  6 ++--
 src/include/replication/reorderbuffer.h         | 11 +++++++
 src/include/replication/walsender_private.h     |  5 +++
 src/test/regress/expected/rules.out             |  7 +++--
 8 files changed, 100 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 901fee9..a3c5f86 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1972,6 +1972,26 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
      <entry><type>timestamp with time zone</type></entry>
      <entry>Send time of last reply message received from standby server</entry>
     </row>
+    <row>
+     <entry><structfield>spill_bytes</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Amount of decoded transaction data spilled to disk.</entry>
+    </row>
+    <row>
+     <entry><structfield>spill_txns</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of transactions spilled to disk after the memory used by
+      logical decoding exceeds <literal>logical_decoding_work_mem</literal>. The
+      counter gets incremented both for toplevel transactions and
+      subtransactions.</entry>
+    </row>
+    <row>
+     <entry><structfield>spill_count</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of times transactions were spilled to disk. Transactions
+      may get spilled repeatedly, and this counter gets incremented on every
+      such invocation.</entry>
+    </row>
    </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 4456fef..f7800f0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -776,7 +776,10 @@ CREATE VIEW pg_stat_replication AS
             W.replay_lag,
             W.sync_priority,
             W.sync_state,
-            W.reply_time
+            W.reply_time,
+            W.spill_txns,
+            W.spill_count,
+            W.spill_bytes
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index d82a5f1..53affeb 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -308,6 +308,10 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	buffer->spillCount = 0;
+	buffer->spillTxns = 0;
+	buffer->spillBytes = 0;
+
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
@@ -2415,6 +2419,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	int			fd = -1;
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
+	Size		size = txn->size;
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -2473,6 +2478,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		spilled++;
 	}
 
+	/* update the statistics */
+	rb->spillCount += 1;
+	rb->spillBytes += size;
+
+	/* Don't consider already serialized transaction. */
+	rb->spillTxns += txn->serialized ? 0 : 1;
+
 	Assert(spilled == txn->nentries_mem);
 	Assert(dlist_is_empty(&txn->changes));
 	txn->nentries_mem = 0;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7f56715..fa75872 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,6 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
+static void UpdateSpillStats(LogicalDecodingContext *ctx);
 static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
 
 
@@ -1261,7 +1262,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 /*
  * LogicalDecodingContext 'update_progress' callback.
  *
- * Write the current position to the lag tracker (see XLogSendPhysical).
+ * Write the current position to the lag tracker (see XLogSendPhysical),
+ * and update the spill statistics.
  */
 static void
 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
@@ -1280,6 +1282,11 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 
 	LagTrackerWrite(lsn, now);
 	sendTime = now;
+
+	/*
+	 * Update statistics about transactions that spilled to disk.
+	 */
+	UpdateSpillStats(ctx);
 }
 
 /*
@@ -2318,6 +2325,9 @@ InitWalSenderSlot(void)
 			walsnd->state = WALSNDSTATE_STARTUP;
 			walsnd->latch = &MyProc->procLatch;
 			walsnd->replyTime = 0;
+			walsnd->spillTxns = 0;
+			walsnd->spillCount = 0;
+			walsnd->spillBytes = 0;
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
 			MyWalSnd = (WalSnd *) walsnd;
@@ -3219,7 +3229,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	12
+#define PG_STAT_GET_WAL_SENDERS_COLS	15
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -3274,6 +3284,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		int			pid;
 		WalSndState state;
 		TimestampTz replyTime;
+		int64		spillTxns;
+		int64		spillCount;
+		int64		spillBytes;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -3294,6 +3307,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		applyLag = walsnd->applyLag;
 		priority = walsnd->sync_standby_priority;
 		replyTime = walsnd->replyTime;
+		spillTxns = walsnd->spillTxns;
+		spillCount = walsnd->spillCount;
+		spillBytes = walsnd->spillBytes;
 		SpinLockRelease(&walsnd->mutex);
 
 		memset(nulls, 0, sizeof(nulls));
@@ -3375,6 +3391,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 				nulls[11] = true;
 			else
 				values[11] = TimestampTzGetDatum(replyTime);
+
+			/* spill to disk */
+			values[12] = Int64GetDatum(spillTxns);
+			values[13] = Int64GetDatum(spillCount);
+			values[14] = Int64GetDatum(spillBytes);
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -3611,3 +3632,20 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+static void
+UpdateSpillStats(LogicalDecodingContext *ctx)
+{
+	ReorderBuffer *rb = ctx->reorder;
+
+	SpinLockAcquire(&MyWalSnd->mutex);
+
+	MyWalSnd->spillTxns = rb->spillTxns;
+	MyWalSnd->spillCount = rb->spillCount;
+	MyWalSnd->spillBytes = rb->spillBytes;
+
+	elog(DEBUG2, "UpdateSpillStats: updating stats %p %ld %ld %ld",
+		 rb, rb->spillTxns, rb->spillCount, rb->spillBytes);
+
+	SpinLockRelease(&MyWalSnd->mutex);
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 58ea5b9..fa0a2a1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5166,9 +5166,9 @@
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
+  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7c94d92..0867ee9 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -402,6 +402,17 @@ struct ReorderBuffer
 
 	/* memory accounting */
 	Size		size;
+
+	/*
+	 * Statistics about transactions spilled to disk.
+	 *
+	 * A single transaction may be spilled repeatedly, which is why we keep
+	 * two different counters. For spilling, the transaction counter includes
+	 * both toplevel transactions and subtransactions.
+	 */
+	int64		spillCount;		/* spill-to-disk invocation counter */
+	int64		spillTxns;		/* number of transactions spilled to disk  */
+	int64		spillBytes;		/* amount of data spilled to disk */
 };
 
 
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 0dd6d1c..a6b3205 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -80,6 +80,11 @@ typedef struct WalSnd
 	 * Timestamp of the last message received from standby.
 	 */
 	TimestampTz replyTime;
+
+	/* Statistics for transactions spilled to disk. */
+	int64		spillTxns;
+	int64		spillCount;
+	int64		spillBytes;
 } WalSnd;
 
 extern WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 14e7214..22e6c86 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1952,9 +1952,12 @@ pg_stat_replication| SELECT s.pid,
     w.replay_lag,
     w.sync_priority,
     w.sync_state,
-    w.reply_time
+    w.reply_time,
+    w.spill_txns,
+    w.spill_count,
+    w.spill_bytes
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,
-- 
1.8.3.1

