From b56b6d8c8686e7dd2b77c25d3d9d61908b769878 Mon Sep 17 00:00:00 2001
From: jcoleman <jtc331@gmail.com>
Date: Fri, 9 Jun 2023 20:12:31 -0500
Subject: [PATCH v2] Add last commit's LSN to pg_stat_database

---
 doc/src/sgml/monitoring.sgml                 |  9 ++++++++
 src/backend/access/transam/twophase.c        | 11 +++++----
 src/backend/access/transam/xact.c            | 11 +++++----
 src/backend/catalog/system_views.sql         |  1 +
 src/backend/utils/activity/pgstat_database.c | 20 +++++++++++++++-
 src/backend/utils/activity/pgstat_xact.c     |  4 ++--
 src/backend/utils/adt/pgstatfuncs.c          |  3 +++
 src/include/catalog/pg_proc.dat              |  4 ++++
 src/include/pgstat.h                         |  4 +++-
 src/include/utils/pgstat_internal.h          |  2 +-
 src/test/regress/expected/rules.out          |  1 +
 src/test/regress/expected/stats.out          | 24 ++++++++++++++++++++
 src/test/regress/sql/stats.sql               | 10 ++++++++
 13 files changed, 90 insertions(+), 14 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index df5242fa80..6f2ea1ddbe 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4414,6 +4414,15 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_commit_lsn</structfield> <type>pg_lsn</type>
+      </para>
+      <para>
+       Write-ahead log location of the last commit in this database
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>xact_commit</structfield> <type>bigint</type>
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 068e59bec0..c38e36e37c 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -200,7 +200,7 @@ static GlobalTransaction MyLockedGxact = NULL;
 
 static bool twophaseExitRegistered = false;
 
-static void RecordTransactionCommitPrepared(TransactionId xid,
+static XLogRecPtr RecordTransactionCommitPrepared(TransactionId xid,
 											int nchildren,
 											TransactionId *children,
 											int nrels,
@@ -1494,6 +1494,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	xl_xact_stats_item *commitstats;
 	xl_xact_stats_item *abortstats;
 	SharedInvalidationMessage *invalmsgs;
+	XLogRecPtr commit_lsn = InvalidXLogRecPtr;
 
 	/*
 	 * Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1549,7 +1550,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 * callbacks will release the locks the transaction held.
 	 */
 	if (isCommit)
-		RecordTransactionCommitPrepared(xid,
+		commit_lsn = RecordTransactionCommitPrepared(xid,
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ncommitstats,
@@ -1644,7 +1645,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	LWLockRelease(TwoPhaseStateLock);
 
 	/* Count the prepared xact as committed or aborted */
-	AtEOXact_PgStat(isCommit, false);
+	AtEOXact_PgStat(isCommit, commit_lsn, false);
 
 	/*
 	 * And now we can clean up any files we may have left.
@@ -2278,7 +2279,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
  * We know the transaction made at least one XLOG entry (its PREPARE),
  * so it is never possible to optimize out the commit record.
  */
-static void
+static XLogRecPtr
 RecordTransactionCommitPrepared(TransactionId xid,
 								int nchildren,
 								TransactionId *children,
@@ -2366,6 +2367,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * in the procarray and continue to hold locks.
 	 */
 	SyncRepWaitForLSN(recptr, true);
+
+	return recptr;
 }
 
 /*
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8daaa535ed..895e103c70 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1280,7 +1280,7 @@ AtSubStart_ResourceOwner(void)
  * If you change this function, see RecordTransactionCommitPrepared also.
  */
 static TransactionId
-RecordTransactionCommit(void)
+RecordTransactionCommit(XLogRecPtr *commit_lsn)
 {
 	TransactionId xid = GetTopTransactionIdIfAny();
 	bool		markXidCommitted = TransactionIdIsValid(xid);
@@ -1397,7 +1397,7 @@ RecordTransactionCommit(void)
 		/*
 		 * Insert the commit XLOG record.
 		 */
-		XactLogCommitRecord(GetCurrentTransactionStopTimestamp(),
+		*commit_lsn = XactLogCommitRecord(GetCurrentTransactionStopTimestamp(),
 							nchildren, children, nrels, rels,
 							ndroppedstats, droppedstats,
 							nmsgs, invalMessages,
@@ -2153,6 +2153,7 @@ CommitTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 	TransactionId latestXid;
+	XLogRecPtr	commit_lsn = InvalidXLogRecPtr;
 	bool		is_parallel_worker;
 
 	is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
@@ -2264,7 +2265,7 @@ CommitTransaction(void)
 		 * We need to mark our XIDs as committed in pg_xact.  This is where we
 		 * durably commit.
 		 */
-		latestXid = RecordTransactionCommit();
+		latestXid = RecordTransactionCommit(&commit_lsn);
 	}
 	else
 	{
@@ -2368,7 +2369,7 @@ CommitTransaction(void)
 	AtEOXact_Files(true);
 	AtEOXact_ComboCid();
 	AtEOXact_HashTables(true);
-	AtEOXact_PgStat(true, is_parallel_worker);
+	AtEOXact_PgStat(true, commit_lsn, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
 	AtEOXact_LogicalRepWorkers(true);
@@ -2869,7 +2870,7 @@ AbortTransaction(void)
 		AtEOXact_Files(false);
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
-		AtEOXact_PgStat(false, is_parallel_worker);
+		AtEOXact_PgStat(false, InvalidXLogRecPtr, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
 		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c18fea8362..014d1d0433 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1029,6 +1029,7 @@ CREATE VIEW pg_stat_database AS
                     WHEN (D.oid = (0)::oid) THEN 0
                     ELSE pg_stat_get_db_numbackends(D.oid)
                 END AS numbackends,
+            pg_stat_get_db_last_commit_lsn(D.oid) AS last_commit_lsn,
             pg_stat_get_db_xact_commit(D.oid) AS xact_commit,
             pg_stat_get_db_xact_rollback(D.oid) AS xact_rollback,
             pg_stat_get_db_blocks_fetched(D.oid) -
diff --git a/src/backend/utils/activity/pgstat_database.c b/src/backend/utils/activity/pgstat_database.c
index 7149f22f72..529cf0dee3 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -34,6 +34,7 @@ SessionEndType pgStatSessionEndCause = DISCONNECT_NORMAL;
 
 static int	pgStatXactCommit = 0;
 static int	pgStatXactRollback = 0;
+static XLogRecPtr	pgStatLastCommitLSN = InvalidXLogRecPtr;
 static PgStat_Counter pgLastSessionReportTime = 0;
 
 
@@ -246,7 +247,7 @@ pgstat_fetch_stat_dbentry(Oid dboid)
 }
 
 void
-AtEOXact_PgStat_Database(bool isCommit, bool parallel)
+AtEOXact_PgStat_Database(bool isCommit, XLogRecPtr commit_lsn, bool parallel)
 {
 	/* Don't count parallel worker transaction stats */
 	if (!parallel)
@@ -256,7 +257,12 @@ AtEOXact_PgStat_Database(bool isCommit, bool parallel)
 		 * bools, in case the reporting message isn't sent right away.)
 		 */
 		if (isCommit)
+		{
 			pgStatXactCommit++;
+			/* For commits also track the LSN of the commit record. */
+			if (!XLogRecPtrIsInvalid(commit_lsn))
+				pgStatLastCommitLSN = commit_lsn;
+		}
 		else
 			pgStatXactRollback++;
 	}
@@ -273,6 +279,13 @@ pgstat_update_dbstats(TimestampTz ts)
 
 	dbentry = pgstat_prep_database_pending(MyDatabaseId);
 
+	/*
+	 * pgStatLastCommitLSN is only ever set when there's a valid commit, so we
+	 * know the LSN must be valid, and since we're local to the current backend
+	 * we don't have to worry if we're advancing or not.
+	 */
+	dbentry->last_commit_lsn = pgStatLastCommitLSN;
+
 	/*
 	 * Accumulate xact commit/rollback and I/O timings to stats entry of the
 	 * current database.
@@ -304,6 +317,7 @@ pgstat_update_dbstats(TimestampTz ts)
 	pgStatBlockWriteTime = 0;
 	pgStatActiveTime = 0;
 	pgStatTransactionIdleTime = 0;
+	/* No reason to zero out pgStatLastCommitLSN; it's not accumulative. */
 }
 
 /*
@@ -414,6 +428,10 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	PGSTAT_ACCUM_DBCOUNT(sessions_killed);
 #undef PGSTAT_ACCUM_DBCOUNT
 
+	/* Only update last_commit_lsn if our backend has the newest commit. */
+	if (pendingent->last_commit_lsn > sharedent->stats.last_commit_lsn)
+		sharedent->stats.last_commit_lsn = pendingent->last_commit_lsn;
+
 	pgstat_unlock_entry(entry_ref);
 
 	memset(pendingent, 0, sizeof(*pendingent));
diff --git a/src/backend/utils/activity/pgstat_xact.c b/src/backend/utils/activity/pgstat_xact.c
index 369239d501..2713b6a4e6 100644
--- a/src/backend/utils/activity/pgstat_xact.c
+++ b/src/backend/utils/activity/pgstat_xact.c
@@ -38,11 +38,11 @@ static PgStat_SubXactStatus *pgStatXactStack = NULL;
  * Called from access/transam/xact.c at top-level transaction commit/abort.
  */
 void
-AtEOXact_PgStat(bool isCommit, bool parallel)
+AtEOXact_PgStat(bool isCommit, XLogRecPtr commit_lsn, bool parallel)
 {
 	PgStat_SubXactStatus *xact_state;
 
-	AtEOXact_PgStat_Database(isCommit, parallel);
+	AtEOXact_PgStat_Database(isCommit, commit_lsn, parallel);
 
 	/* handle transactional stats information */
 	xact_state = pgStatXactStack;
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 49adc319fc..d1a4222458 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1065,6 +1065,9 @@ PG_STAT_GET_DBENTRY_INT64(tuples_returned)
 /* pg_stat_get_db_tuples_updated */
 PG_STAT_GET_DBENTRY_INT64(tuples_updated)
 
+/* pg_stat_get_db_last_commit_lsn */
+PG_STAT_GET_DBENTRY_INT64(last_commit_lsn)
+
 /* pg_stat_get_db_xact_commit */
 PG_STAT_GET_DBENTRY_INT64(xact_commit)
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6996073989..377a0a6ae0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5534,6 +5534,10 @@
   proname => 'pg_stat_get_db_numbackends', provolatile => 's',
   proparallel => 'r', prorettype => 'int4', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_numbackends' },
+{ oid => '8000', descr => 'statistics: wal location of last committed transaction',
+  proname => 'pg_stat_get_db_last_commit_lsn', provolatile => 's',
+  proparallel => 'r', prorettype => 'pg_lsn', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_last_commit_lsn' },
 { oid => '1942', descr => 'statistics: transactions committed',
   proname => 'pg_stat_get_db_xact_commit', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 57a2c0866a..c492759a71 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -18,6 +18,7 @@
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
 #include "utils/wait_event.h"	/* for backward compatibility */
+#include "access/xlogdefs.h"
 
 
 /* ----------
@@ -320,6 +321,7 @@ typedef struct PgStat_IO
 
 typedef struct PgStat_StatDBEntry
 {
+	XLogRecPtr	last_commit_lsn;
 	PgStat_Counter xact_commit;
 	PgStat_Counter xact_rollback;
 	PgStat_Counter blocks_fetched;
@@ -702,7 +704,7 @@ extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
  * Functions in pgstat_xact.c
  */
 
-extern void AtEOXact_PgStat(bool isCommit, bool parallel);
+extern void AtEOXact_PgStat(bool isCommit, XLogRecPtr commit_lsn, bool parallel);
 extern void AtEOSubXact_PgStat(bool isCommit, int nestDepth);
 extern void AtPrepare_PgStat(void);
 extern void PostPrepare_PgStat(void);
diff --git a/src/include/utils/pgstat_internal.h b/src/include/utils/pgstat_internal.h
index 60fbf9394b..700d9c27bb 100644
--- a/src/include/utils/pgstat_internal.h
+++ b/src/include/utils/pgstat_internal.h
@@ -548,7 +548,7 @@ extern void pgstat_checkpointer_snapshot_cb(void);
 
 extern void pgstat_report_disconnect(Oid dboid);
 extern void pgstat_update_dbstats(TimestampTz ts);
-extern void AtEOXact_PgStat_Database(bool isCommit, bool parallel);
+extern void AtEOXact_PgStat_Database(bool isCommit, XLogRecPtr commit_lsn, bool parallel);
 
 extern PgStat_StatDBEntry *pgstat_prep_database_pending(Oid dboid);
 extern void pgstat_reset_database_timestamp(Oid dboid, TimestampTz ts);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 7fd81e6a7d..62b909ca92 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1833,6 +1833,7 @@ pg_stat_database| SELECT oid AS datid,
             WHEN (oid = (0)::oid) THEN 0
             ELSE pg_stat_get_db_numbackends(oid)
         END AS numbackends,
+    pg_stat_get_db_last_commit_lsn(oid) AS last_commit_lsn,
     pg_stat_get_db_xact_commit(oid) AS xact_commit,
     pg_stat_get_db_xact_rollback(oid) AS xact_rollback,
     (pg_stat_get_db_blocks_fetched(oid) - pg_stat_get_db_blocks_hit(oid)) AS blks_read,
diff --git a/src/test/regress/expected/stats.out b/src/test/regress/expected/stats.out
index 8e63340782..fb4dbad15d 100644
--- a/src/test/regress/expected/stats.out
+++ b/src/test/regress/expected/stats.out
@@ -782,6 +782,30 @@ SELECT sessions > :db_stat_sessions FROM pg_stat_database WHERE datname = (SELEC
  t
 (1 row)
 
+-- Test that last_commit_lsn is incremented when a transaction commits
+SELECT last_commit_lsn AS db_stat_last_commit_lsn FROM pg_stat_database WHERE datname = (SELECT current_database()) \gset
+BEGIN;
+CREATE TABLE test_commit_increments(i int);
+COMMIT;
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush 
+--------------------------
+ 
+(1 row)
+
+SELECT last_commit_lsn > :'db_stat_last_commit_lsn'::pg_lsn FROM pg_stat_database WHERE datname = (SELECT current_database());
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT :'db_stat_last_commit_lsn'::pg_lsn < pg_current_wal_insert_lsn();
+ ?column? 
+----------
+ t
+(1 row)
+
+DROP TABLE test_commit_increments;
 -- Test pg_stat_bgwriter checkpointer-related stats, together with pg_stat_wal
 SELECT checkpoints_req AS rqst_ckpts_before FROM pg_stat_bgwriter \gset
 -- Test pg_stat_wal (and make a temp table so our temp schema exists)
diff --git a/src/test/regress/sql/stats.sql b/src/test/regress/sql/stats.sql
index fddf5a8277..7f6180826b 100644
--- a/src/test/regress/sql/stats.sql
+++ b/src/test/regress/sql/stats.sql
@@ -387,6 +387,16 @@ SELECT sessions AS db_stat_sessions FROM pg_stat_database WHERE datname = (SELEC
 SELECT pg_stat_force_next_flush();
 SELECT sessions > :db_stat_sessions FROM pg_stat_database WHERE datname = (SELECT current_database());
 
+-- Test that last_commit_lsn is incremented when a transaction commits
+SELECT last_commit_lsn AS db_stat_last_commit_lsn FROM pg_stat_database WHERE datname = (SELECT current_database()) \gset
+BEGIN;
+CREATE TABLE test_commit_increments(i int);
+COMMIT;
+SELECT pg_stat_force_next_flush();
+SELECT last_commit_lsn > :'db_stat_last_commit_lsn'::pg_lsn FROM pg_stat_database WHERE datname = (SELECT current_database());
+SELECT :'db_stat_last_commit_lsn'::pg_lsn < pg_current_wal_insert_lsn();
+DROP TABLE test_commit_increments;
+
 -- Test pg_stat_bgwriter checkpointer-related stats, together with pg_stat_wal
 SELECT checkpoints_req AS rqst_ckpts_before FROM pg_stat_bgwriter \gset
 
-- 
2.39.3 (Apple Git-145)

