From 5d75d93701bd268e7689791042405450297fede9 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Date: Mon, 5 Jan 2026 09:41:39 +0000
Subject: [PATCH v12 1/1] Add periodic in-transaction stats flushing

Some statistics, such as relation scan counts, blocks fetched/hit,
WAL, IO, and SLRU stats, can safely be flushed to shared memory
mid-transaction.  Others, like tuple insert/update/delete counts,
should only be flushed at transaction boundaries.

Introduce a flush_mode field (FLUSH_IN_TRANSACTION and
FLUSH_AT_TXN_BOUNDARY) fo each stats kind. FLUSH_IN_TRANSACTION will
be flushed every PGSTAT_IDLE_TXN_INTERVAL (10s) while idle in transaction.

The timeout is set each time the transaction goes idle between
commands, not on a fixed repeating interval.  It is disabled when
the session goes idle after the transaction ends.
---
 doc/src/sgml/monitoring.sgml                  |  28 ++++-
 src/backend/replication/walsender.c           |   8 +-
 src/backend/tcop/postgres.c                   |  43 +++++++
 src/backend/utils/activity/pgstat.c           | 109 ++++++++++++++---
 src/backend/utils/activity/pgstat_backend.c   |   6 +-
 src/backend/utils/activity/pgstat_bgwriter.c  |   2 +-
 .../utils/activity/pgstat_checkpointer.c      |   2 +-
 src/backend/utils/activity/pgstat_database.c  |   6 +-
 src/backend/utils/activity/pgstat_function.c  |   8 +-
 src/backend/utils/activity/pgstat_io.c        |   6 +-
 src/backend/utils/activity/pgstat_relation.c  |  98 ++++++++++++---
 src/backend/utils/activity/pgstat_slru.c      |   2 +-
 .../utils/activity/pgstat_subscription.c      |   8 +-
 src/backend/utils/activity/pgstat_wal.c       |  10 +-
 src/backend/utils/init/globals.c              |   1 +
 src/backend/utils/init/postinit.c             |  12 ++
 src/include/miscadmin.h                       |   1 +
 src/include/pgstat.h                          |   8 ++
 src/include/utils/pgstat_internal.h           |  58 +++++++--
 src/include/utils/timeout.h                   |   1 +
 .../test_custom_stats/t/001_custom_stats.pl   |  59 +++++++++
 .../test_custom_var_stats--1.0.sql            |  10 ++
 .../test_custom_stats/test_custom_var_stats.c | 114 +++++++++++++++++-
 src/test/modules/test_misc/meson.build        |   1 +
 .../test_misc/t/011_in_transaction_stats.pl   |  91 ++++++++++++++
 src/tools/pgindent/typedefs.list              |   1 +
 26 files changed, 611 insertions(+), 82 deletions(-)
 create mode 100644 src/test/modules/test_misc/t/011_in_transaction_stats.pl

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 462019a972c..dcdeb9aabec 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -235,15 +235,31 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
    When using the cumulative statistics views and functions to monitor
    collected data, it is important to realize that the information does not
    update instantaneously.  Each individual server process flushes out
-   accumulated statistics to shared memory just before going idle, but not
-   more frequently than once per <varname>PGSTAT_MIN_INTERVAL</varname>
-   milliseconds (1 second unless altered while building the server); so a
-   query or transaction still in progress does not affect the displayed totals
-   and the displayed information lags behind actual activity.  However,
-   current-query information collected by <varname>track_activities</varname>
+   accumulated statistics to shared memory just before going idle or
+   periodically during transactions, depending on whether the statistics
+   should only be flushed at a transaction boundary, as described below.
+   Current-query information collected by <varname>track_activities</varname>
    is always up-to-date.
   </para>
 
+  <para>
+   When a server process is about to become idle, it flushes all pending
+   statistics to shared memory, but not more frequently than once per
+   <varname>PGSTAT_MIN_INTERVAL</varname> milliseconds (1 second unless
+   altered while building the server).
+  </para>
+
+  <para>
+   During explicit transaction blocks, statistics that can be flushed at any
+   time, such as relation scan counts, tuples returned and fetched, blocks
+   fetched and hit, as well as IO, WAL, SLRU, and backend statistics, are
+   periodically flushed to shared memory every
+   <varname>PGSTAT_IDLE_TXN_INTERVAL</varname> milliseconds (10 seconds
+   unless altered while building the server).  This periodic flushing occurs
+   when the transaction goes idle between commands.  It does not apply to
+   single statements running outside an explicit transaction block.
+  </para>
+
   <para>
    Another important point is that when a server process is asked to display
    any of the accumulated statistics, accessed values are cached until the end
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 376ff46340d..24124d478e8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1993,8 +1993,8 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (TimestampDifferenceExceeds(last_flush, now,
 									   WALSENDER_STATS_FLUSH_INTERVAL))
 		{
-			pgstat_flush_io(false);
-			(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+			pgstat_flush_io(false, true);
+			(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO, true);
 			last_flush = now;
 		}
 
@@ -3022,8 +3022,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			if (TimestampDifferenceExceeds(last_flush, now,
 										   WALSENDER_STATS_FLUSH_INTERVAL))
 			{
-				pgstat_flush_io(false);
-				(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+				pgstat_flush_io(false, true);
+				(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO, true);
 				last_flush = now;
 			}
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index b3563113219..00436fee0de 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3565,6 +3565,12 @@ ProcessInterrupts(void)
 		pgstat_report_stat(true);
 	}
 
+	if (InTransactionStatsUpdateTimeoutPending)
+	{
+		InTransactionStatsUpdateTimeoutPending = false;
+		pgstat_report_in_transaction_stat(false);
+	}
+
 	if (ProcSignalBarrierPending)
 		ProcessProcSignalBarrier();
 
@@ -4621,6 +4627,21 @@ PostgresMain(const char *dbname, const char *username)
 					enable_timeout_after(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
 										 IdleInTransactionSessionTimeout);
 				}
+
+				/*
+				 * Flush in-transaction stats.
+				 */
+				if (!get_timeout_active(IN_TRANSACTION_STATS_UPDATE_TIMEOUT))
+				{
+					int			stats_interval = PGSTAT_IDLE_TXN_INTERVAL;
+
+#ifdef USE_INJECTION_POINTS
+					if (IS_INJECTION_POINT_ATTACHED("in-transaction-stats-short-interval"))
+						stats_interval = 1000;
+#endif
+					enable_timeout_after(IN_TRANSACTION_STATS_UPDATE_TIMEOUT,
+										 stats_interval);
+				}
 			}
 			else if (IsTransactionOrTransactionBlock())
 			{
@@ -4635,6 +4656,21 @@ PostgresMain(const char *dbname, const char *username)
 					enable_timeout_after(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
 										 IdleInTransactionSessionTimeout);
 				}
+
+				/*
+				 * Flush in-transaction stats.
+				 */
+				if (!get_timeout_active(IN_TRANSACTION_STATS_UPDATE_TIMEOUT))
+				{
+					int			stats_interval = PGSTAT_IDLE_TXN_INTERVAL;
+
+#ifdef USE_INJECTION_POINTS
+					if (IS_INJECTION_POINT_ATTACHED("in-transaction-stats-short-interval"))
+						stats_interval = 1000;
+#endif
+					enable_timeout_after(IN_TRANSACTION_STATS_UPDATE_TIMEOUT,
+										 stats_interval);
+				}
 			}
 			else
 			{
@@ -4650,6 +4686,13 @@ PostgresMain(const char *dbname, const char *username)
 				if (notifyInterruptPending)
 					ProcessNotifyInterrupt(false);
 
+				/*
+				 * We are no longer in a transaction, so disable the
+				 * in-transaction stats flush timeout if it was active.
+				 */
+				if (get_timeout_active(IN_TRANSACTION_STATS_UPDATE_TIMEOUT))
+					disable_timeout(IN_TRANSACTION_STATS_UPDATE_TIMEOUT, false);
+
 				/*
 				 * Check if we need to report stats. If pgstat_report_stat()
 				 * decides it's too soon to flush out pending stats / lock
diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c
index 11bb71cad5a..70090565fee 100644
--- a/src/backend/utils/activity/pgstat.c
+++ b/src/backend/utils/activity/pgstat.c
@@ -187,7 +187,8 @@ static void pgstat_init_snapshot_fixed(void);
 
 static void pgstat_reset_after_failure(void);
 
-static bool pgstat_flush_pending_entries(bool nowait);
+static bool pgstat_flush_pending_entries(bool nowait, bool in_txn_only);
+static bool pgstat_flush_fixed_stats(bool nowait, bool in_txn_only);
 
 static void pgstat_prep_snapshot(void);
 static void pgstat_build_snapshot(void);
@@ -288,6 +289,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = false,
 		.write_to_file = true,
+		.flush_mode = FLUSH_IN_TRANSACTION,
 		/* so pg_stat_database entries can be seen in all databases */
 		.accessed_across_databases = true,
 
@@ -305,6 +307,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = false,
 		.write_to_file = true,
+		.flush_mode = FLUSH_IN_TRANSACTION,
 
 		.shared_size = sizeof(PgStatShared_Relation),
 		.shared_data_off = offsetof(PgStatShared_Relation, stats),
@@ -321,6 +324,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = false,
 		.write_to_file = true,
+		.flush_mode = FLUSH_AT_TXN_BOUNDARY,
 
 		.shared_size = sizeof(PgStatShared_Function),
 		.shared_data_off = offsetof(PgStatShared_Function, stats),
@@ -336,6 +340,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = false,
 		.write_to_file = true,
+		.flush_mode = FLUSH_AT_TXN_BOUNDARY,
 
 		.accessed_across_databases = true,
 
@@ -353,6 +358,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = false,
 		.write_to_file = true,
+		.flush_mode = FLUSH_AT_TXN_BOUNDARY,
 		/* so pg_stat_subscription_stats entries can be seen in all databases */
 		.accessed_across_databases = true,
 
@@ -370,6 +376,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = false,
 		.write_to_file = false,
+		.flush_mode = FLUSH_IN_TRANSACTION,
 
 		.accessed_across_databases = true,
 
@@ -436,6 +443,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = true,
 		.write_to_file = true,
+		.flush_mode = FLUSH_IN_TRANSACTION,
 
 		.snapshot_ctl_off = offsetof(PgStat_Snapshot, io),
 		.shared_ctl_off = offsetof(PgStat_ShmemControl, io),
@@ -453,6 +461,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = true,
 		.write_to_file = true,
+		.flush_mode = FLUSH_IN_TRANSACTION,
 
 		.snapshot_ctl_off = offsetof(PgStat_Snapshot, slru),
 		.shared_ctl_off = offsetof(PgStat_ShmemControl, slru),
@@ -470,6 +479,7 @@ static const PgStat_KindInfo pgstat_kind_builtin_infos[PGSTAT_KIND_BUILTIN_SIZE]
 
 		.fixed_amount = true,
 		.write_to_file = true,
+		.flush_mode = FLUSH_IN_TRANSACTION,
 
 		.snapshot_ctl_off = offsetof(PgStat_Snapshot, wal),
 		.shared_ctl_off = offsetof(PgStat_ShmemControl, wal),
@@ -775,23 +785,11 @@ pgstat_report_stat(bool force)
 	partial_flush = false;
 
 	/* flush of variable-numbered stats tracked in pending entries list */
-	partial_flush |= pgstat_flush_pending_entries(nowait);
+	partial_flush |= pgstat_flush_pending_entries(nowait, false);
 
 	/* flush of other stats kinds */
 	if (pgstat_report_fixed)
-	{
-		for (PgStat_Kind kind = PGSTAT_KIND_MIN; kind <= PGSTAT_KIND_MAX; kind++)
-		{
-			const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
-
-			if (!kind_info)
-				continue;
-			if (!kind_info->flush_static_cb)
-				continue;
-
-			partial_flush |= kind_info->flush_static_cb(nowait);
-		}
-	}
+		partial_flush |= pgstat_flush_fixed_stats(nowait, false);
 
 	last_flush = now;
 
@@ -1293,7 +1291,8 @@ pgstat_prep_pending_entry(PgStat_Kind kind, Oid dboid, uint64 objid, bool *creat
 
 	if (entry_ref->pending == NULL)
 	{
-		size_t		entrysize = pgstat_get_kind_info(kind)->pending_size;
+		const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+		size_t		entrysize = kind_info->pending_size;
 
 		Assert(entrysize != (size_t) -1);
 
@@ -1345,9 +1344,15 @@ pgstat_delete_pending_entry(PgStat_EntryRef *entry_ref)
 
 /*
  * Flush out pending variable-numbered stats.
+ *
+ * If in_txn_only is true, only flushes FLUSH_IN_TRANSACTION entries. For entries
+ * that support it, the callback may flush only non-transactional fields.
+ * This is safe to call inside transactions.
+ *
+ * If in_txn_only is false, flushes all entries.
  */
 static bool
-pgstat_flush_pending_entries(bool nowait)
+pgstat_flush_pending_entries(bool nowait, bool in_txn_only)
 {
 	bool		have_pending = false;
 	dlist_node *cur = NULL;
@@ -1372,13 +1377,29 @@ pgstat_flush_pending_entries(bool nowait)
 		PgStat_Kind kind = key.kind;
 		const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
 		bool		did_flush;
+		bool		is_partial_flush = false;
 		dlist_node *next;
 
 		Assert(!kind_info->fixed_amount);
 		Assert(kind_info->flush_pending_cb != NULL);
 
+		/* Skip transactional stats if we're in in_txn_only mode */
+		if (in_txn_only && kind_info->flush_mode == FLUSH_AT_TXN_BOUNDARY)
+		{
+			have_pending = true;
+
+			if (dlist_has_next(&pgStatPending, cur))
+				next = dlist_next_node(&pgStatPending, cur);
+			else
+				next = NULL;
+
+			cur = next;
+			continue;
+		}
+
 		/* flush the stats, if possible */
-		did_flush = kind_info->flush_pending_cb(entry_ref, nowait);
+		did_flush = kind_info->flush_pending_cb(entry_ref, nowait,
+												in_txn_only, &is_partial_flush);
 
 		Assert(did_flush || nowait);
 
@@ -1388,8 +1409,8 @@ pgstat_flush_pending_entries(bool nowait)
 		else
 			next = NULL;
 
-		/* if successfully flushed, remove entry */
-		if (did_flush)
+		/* if successfull non-partial flush, remove entry */
+		if (did_flush && !is_partial_flush)
 			pgstat_delete_pending_entry(entry_ref);
 		else
 			have_pending = true;
@@ -1402,6 +1423,33 @@ pgstat_flush_pending_entries(bool nowait)
 	return have_pending;
 }
 
+/*
+ * Flush fixed-amount stats.
+ *
+ * If in_txn_only is true, only flushes FLUSH_IN_TRANSACTION stats (safe inside transactions).
+ * If in_txn_only is false, flushes all stats with flush_static_cb.
+ */
+static bool
+pgstat_flush_fixed_stats(bool nowait, bool in_txn_only)
+{
+	bool		partial_flush = false;
+
+	for (PgStat_Kind kind = PGSTAT_KIND_MIN; kind <= PGSTAT_KIND_MAX; kind++)
+	{
+		const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+
+		if (!kind_info || !kind_info->flush_static_cb)
+			continue;
+
+		/* Skip transactional stats if we're in in_txn_only mode */
+		if (in_txn_only && kind_info->flush_mode == FLUSH_AT_TXN_BOUNDARY)
+			continue;
+
+		partial_flush |= kind_info->flush_static_cb(nowait, in_txn_only);
+	}
+
+	return partial_flush;
+}
 
 /* ------------------------------------------------------------
  * Helper / infrastructure functions
@@ -2119,3 +2167,24 @@ assign_stats_fetch_consistency(int newval, void *extra)
 	if (pgstat_fetch_consistency != newval)
 		force_stats_snapshot_clear = true;
 }
+
+/*
+ * Flushes only FLUSH_IN_TRANSACTION stats using non-blocking locks. Transactional
+ * stats (FLUSH_AT_TXN_BOUNDARY) remain pending until transaction boundary.
+ * Safe to call inside transactions.
+ *
+ * This is called from a timeout handler every PGSTAT_IDLE_TXN_INTERVAL
+ * (10000ms) while idle in transaction, to ensure non-transactional stats
+ * (e.g., IO, backend) are flushed periodically even during long transactions.
+ */
+void
+pgstat_report_in_transaction_stat(bool force)
+{
+	bool		nowait = !force;
+
+	pgstat_assert_is_up();
+
+	/* Flush stats outside of transaction boundary */
+	pgstat_flush_pending_entries(nowait, true);
+	pgstat_flush_fixed_stats(nowait, true);
+}
diff --git a/src/backend/utils/activity/pgstat_backend.c b/src/backend/utils/activity/pgstat_backend.c
index f2f8d3ff75f..3be8b2705b1 100644
--- a/src/backend/utils/activity/pgstat_backend.c
+++ b/src/backend/utils/activity/pgstat_backend.c
@@ -268,7 +268,7 @@ pgstat_flush_backend_entry_wal(PgStat_EntryRef *entry_ref)
  * if some statistics could not be flushed due to lock contention.
  */
 bool
-pgstat_flush_backend(bool nowait, bits32 flags)
+pgstat_flush_backend(bool nowait, bits32 flags, bool in_txn_only)
 {
 	PgStat_EntryRef *entry_ref;
 	bool		has_pending_data = false;
@@ -311,9 +311,9 @@ pgstat_flush_backend(bool nowait, bits32 flags)
  * If some stats could not be flushed due to lock contention, return true.
  */
 bool
-pgstat_backend_flush_cb(bool nowait)
+pgstat_backend_flush_cb(bool nowait, bool in_txn_only)
 {
-	return pgstat_flush_backend(nowait, PGSTAT_BACKEND_FLUSH_ALL);
+	return pgstat_flush_backend(nowait, PGSTAT_BACKEND_FLUSH_ALL, in_txn_only);
 }
 
 /*
diff --git a/src/backend/utils/activity/pgstat_bgwriter.c b/src/backend/utils/activity/pgstat_bgwriter.c
index ed2fd801189..1c5f0c3ec40 100644
--- a/src/backend/utils/activity/pgstat_bgwriter.c
+++ b/src/backend/utils/activity/pgstat_bgwriter.c
@@ -61,7 +61,7 @@ pgstat_report_bgwriter(void)
 	/*
 	 * Report IO statistics
 	 */
-	pgstat_flush_io(false);
+	pgstat_flush_io(false, true);
 }
 
 /*
diff --git a/src/backend/utils/activity/pgstat_checkpointer.c b/src/backend/utils/activity/pgstat_checkpointer.c
index 1f70194b7a7..2d89a082464 100644
--- a/src/backend/utils/activity/pgstat_checkpointer.c
+++ b/src/backend/utils/activity/pgstat_checkpointer.c
@@ -68,7 +68,7 @@ pgstat_report_checkpointer(void)
 	/*
 	 * Report IO statistics
 	 */
-	pgstat_flush_io(false);
+	pgstat_flush_io(false, true);
 }
 
 /*
diff --git a/src/backend/utils/activity/pgstat_database.c b/src/backend/utils/activity/pgstat_database.c
index 933dcb5cae5..72d757a6631 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -435,7 +435,8 @@ pgstat_reset_database_timestamp(Oid dboid, TimestampTz ts)
  * false without flushing the entry.  Otherwise returns true.
  */
 bool
-pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
+pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait,
+						 bool in_txn_only, bool *is_partial)
 {
 	PgStatShared_Database *sharedent;
 	PgStat_StatDBEntry *pendingent;
@@ -443,6 +444,9 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	pendingent = (PgStat_StatDBEntry *) entry_ref->pending;
 	sharedent = (PgStatShared_Database *) entry_ref->shared_stats;
 
+	/* this is not a partial flush */
+	*is_partial = false;
+
 	if (!pgstat_lock_entry(entry_ref, nowait))
 		return false;
 
diff --git a/src/backend/utils/activity/pgstat_function.c b/src/backend/utils/activity/pgstat_function.c
index e6b84283c6c..d51371d7967 100644
--- a/src/backend/utils/activity/pgstat_function.c
+++ b/src/backend/utils/activity/pgstat_function.c
@@ -190,14 +190,20 @@ pgstat_end_function_usage(PgStat_FunctionCallUsage *fcu, bool finalize)
  * false without flushing the entry.  Otherwise returns true.
  */
 bool
-pgstat_function_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
+pgstat_function_flush_cb(PgStat_EntryRef *entry_ref, bool nowait,
+						 bool in_txn_only, bool *is_partial)
 {
 	PgStat_FunctionCounts *localent;
 	PgStatShared_Function *shfuncent;
 
+	Assert(!in_txn_only);
+
 	localent = (PgStat_FunctionCounts *) entry_ref->pending;
 	shfuncent = (PgStatShared_Function *) entry_ref->shared_stats;
 
+	/* this is not a partial flush */
+	*is_partial = false;
+
 	/* localent always has non-zero content */
 
 	if (!pgstat_lock_entry(entry_ref, nowait))
diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c
index 28de24538dc..ec541bf3f78 100644
--- a/src/backend/utils/activity/pgstat_io.c
+++ b/src/backend/utils/activity/pgstat_io.c
@@ -172,9 +172,9 @@ pgstat_fetch_stat_io(void)
  * Simpler wrapper of pgstat_io_flush_cb()
  */
 void
-pgstat_flush_io(bool nowait)
+pgstat_flush_io(bool nowait, bool in_txn_only)
 {
-	(void) pgstat_io_flush_cb(nowait);
+	(void) pgstat_io_flush_cb(nowait, in_txn_only);
 }
 
 /*
@@ -186,7 +186,7 @@ pgstat_flush_io(bool nowait)
  * acquired. Otherwise, return false.
  */
 bool
-pgstat_io_flush_cb(bool nowait)
+pgstat_io_flush_cb(bool nowait, bool in_txn_only)
 {
 	LWLock	   *bktype_lock;
 	PgStat_BktypeIO *bktype_shstats;
diff --git a/src/backend/utils/activity/pgstat_relation.c b/src/backend/utils/activity/pgstat_relation.c
index bc8c43b96aa..33359cf07b0 100644
--- a/src/backend/utils/activity/pgstat_relation.c
+++ b/src/backend/utils/activity/pgstat_relation.c
@@ -47,7 +47,19 @@ static void add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_lev
 static void ensure_tabstat_xact_level(PgStat_TableStatus *pgstat_info);
 static void save_truncdrop_counters(PgStat_TableXactStatus *trans, bool is_drop);
 static void restore_truncdrop_counters(PgStat_TableXactStatus *trans);
+static void flush_relation_in_transaction_stats(PgStat_StatTabEntry *tabentry,
+												PgStat_TableCounts *counts, bool in_txn_only);
 
+/*
+ * Update database statistics with non-transactional stats.
+ */
+#define UPDATE_DATABASE_IN_TRANSACTION_STATS(dbentry, counts)				\
+	do {															\
+		(dbentry)->tuples_returned += (counts)->tuples_returned;	\
+		(dbentry)->tuples_fetched += (counts)->tuples_fetched;		\
+		(dbentry)->blocks_fetched += (counts)->blocks_fetched;		\
+		(dbentry)->blocks_hit += (counts)->blocks_hit;				\
+	} while (0)
 
 /*
  * Copy stats between relations. This is used for things like REINDEX
@@ -267,8 +279,8 @@ pgstat_report_vacuum(Relation rel, PgStat_Counter livetuples,
 	 * is done -- which will likely vacuum many relations -- or until the
 	 * VACUUM command has processed all tables and committed.
 	 */
-	pgstat_flush_io(false);
-	(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+	pgstat_flush_io(false, true);
+	(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO, true);
 }
 
 /*
@@ -362,8 +374,8 @@ pgstat_report_analyze(Relation rel,
 	pgstat_unlock_entry(entry_ref);
 
 	/* see pgstat_report_vacuum() */
-	pgstat_flush_io(false);
-	(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+	pgstat_flush_io(false, true);
+	(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO, true);
 }
 
 /*
@@ -802,6 +814,29 @@ pgstat_twophase_postabort(FullTransactionId fxid, uint16 info,
 		rec->tuples_inserted + rec->tuples_updated;
 }
 
+/*
+ * Helper function to flush non-transactional statistics.
+ */
+static void
+flush_relation_in_transaction_stats(PgStat_StatTabEntry *tabentry, PgStat_TableCounts *counts,
+									bool in_txn_only)
+{
+	TimestampTz t;
+
+	tabentry->numscans += counts->numscans;
+	if (counts->numscans)
+	{
+		t = in_txn_only ? GetCurrentTimestamp() : GetCurrentTransactionStopTimestamp();
+		if (t > tabentry->lastscan)
+			tabentry->lastscan = t;
+	}
+
+	tabentry->tuples_returned += counts->tuples_returned;
+	tabentry->tuples_fetched += counts->tuples_fetched;
+	tabentry->blocks_fetched += counts->blocks_fetched;
+	tabentry->blocks_hit += counts->blocks_hit;
+}
+
 /*
  * Flush out pending stats for the entry
  *
@@ -810,9 +845,17 @@ pgstat_twophase_postabort(FullTransactionId fxid, uint16 info,
  *
  * Some of the stats are copied to the corresponding pending database stats
  * entry when successfully flushing.
+ *
+ * If in_txn_only is true, only non-transactional fields are flushed
+ * (numscans, tuples_returned, tuples_fetched, blocks_fetched, blocks_hit).
+ * Transactional fields remain pending until transaction boundary.
+ *
+ * Some of the stats are copied to the corresponding pending database stats
+ * entry when successfully flushing.
  */
 bool
-pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
+pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait,
+						 bool in_txn_only, bool *is_partial)
 {
 	Oid			dboid;
 	PgStat_TableStatus *lstats; /* pending stats entry  */
@@ -824,6 +867,9 @@ pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	lstats = (PgStat_TableStatus *) entry_ref->pending;
 	shtabstats = (PgStatShared_Relation *) entry_ref->shared_stats;
 
+	/* this is a partial flush if in in_txn_only mode */
+	*is_partial = in_txn_only;
+
 	/*
 	 * Ignore entries that didn't accumulate any actual counts, such as
 	 * indexes that were opened by the planner but not used.
@@ -835,19 +881,36 @@ pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	if (!pgstat_lock_entry(entry_ref, nowait))
 		return false;
 
-	/* add the values to the shared entry. */
 	tabentry = &shtabstats->stats;
 
-	tabentry->numscans += lstats->counts.numscans;
-	if (lstats->counts.numscans)
+	if (in_txn_only)
 	{
-		TimestampTz t = GetCurrentTransactionStopTimestamp();
 
-		if (t > tabentry->lastscan)
-			tabentry->lastscan = t;
+		/* Flush non-transactional statistics */
+		flush_relation_in_transaction_stats(tabentry, &lstats->counts, true);
+
+		pgstat_unlock_entry(entry_ref);
+
+		/* Also update the corresponding fields in database stats */
+		dbentry = pgstat_prep_database_pending(dboid);
+		UPDATE_DATABASE_IN_TRANSACTION_STATS(dbentry, &lstats->counts);
+
+		/*
+		 * Clear the flushed fields from pending stats to prevent
+		 * double-counting when we flush all fields at transaction boundary.
+		 */
+		lstats->counts.numscans = 0;
+		lstats->counts.tuples_returned = 0;
+		lstats->counts.tuples_fetched = 0;
+		lstats->counts.blocks_fetched = 0;
+		lstats->counts.blocks_hit = 0;
+
+		return true;
 	}
-	tabentry->tuples_returned += lstats->counts.tuples_returned;
-	tabentry->tuples_fetched += lstats->counts.tuples_fetched;
+
+	/* Flush non-transactional statistics */
+	flush_relation_in_transaction_stats(tabentry, &lstats->counts, false);
+
 	tabentry->tuples_inserted += lstats->counts.tuples_inserted;
 	tabentry->tuples_updated += lstats->counts.tuples_updated;
 	tabentry->tuples_deleted += lstats->counts.tuples_deleted;
@@ -877,9 +940,6 @@ pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	 */
 	tabentry->ins_since_vacuum += lstats->counts.tuples_inserted;
 
-	tabentry->blocks_fetched += lstats->counts.blocks_fetched;
-	tabentry->blocks_hit += lstats->counts.blocks_hit;
-
 	/* Clamp live_tuples in case of negative delta_live_tuples */
 	tabentry->live_tuples = Max(tabentry->live_tuples, 0);
 	/* Likewise for dead_tuples */
@@ -889,13 +949,11 @@ pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 
 	/* The entry was successfully flushed, add the same to database stats */
 	dbentry = pgstat_prep_database_pending(dboid);
-	dbentry->tuples_returned += lstats->counts.tuples_returned;
-	dbentry->tuples_fetched += lstats->counts.tuples_fetched;
+	UPDATE_DATABASE_IN_TRANSACTION_STATS(dbentry, &lstats->counts);
+
 	dbentry->tuples_inserted += lstats->counts.tuples_inserted;
 	dbentry->tuples_updated += lstats->counts.tuples_updated;
 	dbentry->tuples_deleted += lstats->counts.tuples_deleted;
-	dbentry->blocks_fetched += lstats->counts.blocks_fetched;
-	dbentry->blocks_hit += lstats->counts.blocks_hit;
 
 	return true;
 }
diff --git a/src/backend/utils/activity/pgstat_slru.c b/src/backend/utils/activity/pgstat_slru.c
index 2190f388eae..d8dc7be11ef 100644
--- a/src/backend/utils/activity/pgstat_slru.c
+++ b/src/backend/utils/activity/pgstat_slru.c
@@ -139,7 +139,7 @@ pgstat_get_slru_index(const char *name)
  * acquired. Otherwise return false.
  */
 bool
-pgstat_slru_flush_cb(bool nowait)
+pgstat_slru_flush_cb(bool nowait, bool in_txn_only)
 {
 	PgStatShared_SLRU *stats_shmem = &pgStatLocal.shmem->slru;
 	int			i;
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index 3277cf88a4e..4f289683d33 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -117,14 +117,20 @@ pgstat_fetch_stat_subscription(Oid subid)
  * false without flushing the entry.  Otherwise returns true.
  */
 bool
-pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
+pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait,
+							 bool in_txn_only, bool *is_partial)
 {
 	PgStat_BackendSubEntry *localent;
 	PgStatShared_Subscription *shsubent;
 
+	Assert(!in_txn_only);
+
 	localent = (PgStat_BackendSubEntry *) entry_ref->pending;
 	shsubent = (PgStatShared_Subscription *) entry_ref->shared_stats;
 
+	/* this is not a partial flush */
+	*is_partial = false;
+
 	/* localent always has non-zero content */
 
 	if (!pgstat_lock_entry(entry_ref, nowait))
diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c
index 183e0a7a97b..bf796ad655e 100644
--- a/src/backend/utils/activity/pgstat_wal.c
+++ b/src/backend/utils/activity/pgstat_wal.c
@@ -51,12 +51,12 @@ pgstat_report_wal(bool force)
 	nowait = !force;
 
 	/* flush wal stats */
-	(void) pgstat_wal_flush_cb(nowait);
-	pgstat_flush_backend(nowait, PGSTAT_BACKEND_FLUSH_WAL);
+	(void) pgstat_wal_flush_cb(nowait, true);
+	(void) pgstat_flush_backend(nowait, PGSTAT_BACKEND_FLUSH_WAL, true);
 
 	/* flush IO stats */
-	pgstat_flush_io(nowait);
-	(void) pgstat_flush_backend(nowait, PGSTAT_BACKEND_FLUSH_IO);
+	pgstat_flush_io(nowait, true);
+	(void) pgstat_flush_backend(nowait, PGSTAT_BACKEND_FLUSH_IO, true);
 }
 
 /*
@@ -88,7 +88,7 @@ pgstat_wal_have_pending(void)
  * acquired. Otherwise return false.
  */
 bool
-pgstat_wal_flush_cb(bool nowait)
+pgstat_wal_flush_cb(bool nowait, bool in_txn_only)
 {
 	PgStatShared_Wal *stats_shmem = &pgStatLocal.shmem->wal;
 	WalUsage	wal_usage_diff = {0};
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 36ad708b360..bcce54367d5 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -40,6 +40,7 @@ volatile sig_atomic_t IdleSessionTimeoutPending = false;
 volatile sig_atomic_t ProcSignalBarrierPending = false;
 volatile sig_atomic_t LogMemoryContextPending = false;
 volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false;
+volatile sig_atomic_t InTransactionStatsUpdateTimeoutPending = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
 volatile uint32 CritSectionCount = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 26118661f07..27ecf0df16e 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,6 +65,7 @@
 #include "utils/injection_point.h"
 #include "utils/memutils.h"
 #include "utils/pg_locale.h"
+#include "utils/pgstat_internal.h"
 #include "utils/portal.h"
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
@@ -89,6 +90,7 @@ static void IdleInTransactionSessionTimeoutHandler(void);
 static void TransactionTimeoutHandler(void);
 static void IdleSessionTimeoutHandler(void);
 static void IdleStatsUpdateTimeoutHandler(void);
+static void InTransactionStatsUpdateTimeoutHandler(void);
 static void ClientCheckTimeoutHandler(void);
 static bool ThereIsAtLeastOneRole(void);
 static void process_startup_options(Port *port, bool am_superuser);
@@ -774,6 +776,8 @@ InitPostgres(const char *in_dbname, Oid dboid,
 		RegisterTimeout(CLIENT_CONNECTION_CHECK_TIMEOUT, ClientCheckTimeoutHandler);
 		RegisterTimeout(IDLE_STATS_UPDATE_TIMEOUT,
 						IdleStatsUpdateTimeoutHandler);
+		RegisterTimeout(IN_TRANSACTION_STATS_UPDATE_TIMEOUT,
+						InTransactionStatsUpdateTimeoutHandler);
 	}
 
 	/*
@@ -1433,6 +1437,14 @@ IdleStatsUpdateTimeoutHandler(void)
 	SetLatch(MyLatch);
 }
 
+static void
+InTransactionStatsUpdateTimeoutHandler(void)
+{
+	InTransactionStatsUpdateTimeoutPending = true;
+	InterruptPending = true;
+	SetLatch(MyLatch);
+}
+
 static void
 ClientCheckTimeoutHandler(void)
 {
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index f16f35659b9..7051e4b6f3f 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -96,6 +96,7 @@ extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending;
 extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending;
 extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending;
 extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending;
+extern PGDLLIMPORT volatile sig_atomic_t InTransactionStatsUpdateTimeoutPending;
 
 extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
 extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 216b93492ba..8571eb1afd7 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -38,6 +38,13 @@ typedef struct RelationData *Relation;
 /* Default directory to store temporary statistics data in */
 #define PG_STAT_TMP_DIR		"pg_stat_tmp"
 
+/*
+ * Interval in milliseconds for flushing FLUSH_IN_TRANSACTION stats to shared
+ * memory.  An explicit transaction always enters idle-in-transaction state
+ * between commands, which is when this timeout is enabled.
+ */
+#define PGSTAT_IDLE_TXN_INTERVAL	10000
+
 /* Values for track_functions GUC variable --- order is significant! */
 typedef enum TrackFunctionsLevel
 {
@@ -536,6 +543,7 @@ extern void pgstat_initialize(void);
 
 /* Functions called from backends */
 extern long pgstat_report_stat(bool force);
+extern void pgstat_report_in_transaction_stat(bool force);
 extern void pgstat_force_next_flush(void);
 
 extern void pgstat_reset_counters(void);
diff --git a/src/include/utils/pgstat_internal.h b/src/include/utils/pgstat_internal.h
index 9b8fbae00ed..e9f4dc85528 100644
--- a/src/include/utils/pgstat_internal.h
+++ b/src/include/utils/pgstat_internal.h
@@ -224,6 +224,18 @@ typedef struct PgStat_SubXactStatus
 	PgStat_TableXactStatus *first;	/* head of list for this subxact */
 } PgStat_SubXactStatus;
 
+/*
+ * Flush mode for statistics kinds.
+ *
+ * FLUSH_AT_TXN_BOUNDARY has to be the first because we want it to be the
+ * default value.
+ */
+typedef enum PgStat_FlushMode
+{
+	FLUSH_AT_TXN_BOUNDARY,		/* All fields can only be flushed at
+								 * transaction boundary */
+	FLUSH_IN_TRANSACTION,		/* All fields can be flushed in a transaction */
+} PgStat_FlushMode;
 
 /*
  * Metadata for a specific kind of statistics.
@@ -251,6 +263,16 @@ typedef struct PgStat_KindInfo
 	 */
 	bool		track_entry_count:1;
 
+	/*
+	 * The mode of when to flush stats. See PgStat_FlushMode for more details.
+	 *
+	 * This member only has meaning for statistics kinds that accumulate
+	 * pending stats and use flush callbacks. For kinds that write directly to
+	 * shared memory (e.g., archiver, bgwriter, checkpointer), this member has
+	 * no effect.
+	 */
+	PgStat_FlushMode flush_mode;
+
 	/*
 	 * The size of an entry in the shared stats hash table (pointed to by
 	 * PgStatShared_HashEntry->body).  For fixed-numbered statistics, this is
@@ -297,8 +319,13 @@ typedef struct PgStat_KindInfo
 	 * For variable-numbered stats: flush pending stats. Required if pending
 	 * data is used. See flush_static_cb when dealing with stats data that
 	 * that cannot use PgStat_EntryRef->pending.
+	 *
+	 * The in_txn_only parameter indicates whether this is an in-transaction
+	 * flush. The is_partial parameter indicates whether this is a partial
+	 * flush.
 	 */
-	bool		(*flush_pending_cb) (PgStat_EntryRef *sr, bool nowait);
+	bool		(*flush_pending_cb) (PgStat_EntryRef *sr, bool nowait,
+									 bool in_txn_only, bool *is_partial);
 
 	/*
 	 * For variable-numbered stats: delete pending stats. Optional.
@@ -366,8 +393,11 @@ typedef struct PgStat_KindInfo
 	 *
 	 * "pgstat_report_fixed" needs to be set to trigger the flush of pending
 	 * stats.
+	 *
+	 * The in_txn_only parameter indicates whether this is an in-transaction
+	 * flush.
 	 */
-	bool		(*flush_static_cb) (bool nowait);
+	bool		(*flush_static_cb) (bool nowait, bool in_txn_only);
 
 	/*
 	 * For fixed-numbered statistics: Reset All.
@@ -696,8 +726,8 @@ extern void pgstat_archiver_snapshot_cb(void);
 #define PGSTAT_BACKEND_FLUSH_WAL   (1 << 1) /* Flush WAL statistics */
 #define PGSTAT_BACKEND_FLUSH_ALL   (PGSTAT_BACKEND_FLUSH_IO | PGSTAT_BACKEND_FLUSH_WAL)
 
-extern bool pgstat_flush_backend(bool nowait, bits32 flags);
-extern bool pgstat_backend_flush_cb(bool nowait);
+extern bool pgstat_flush_backend(bool nowait, bits32 flags, bool in_txn_only);
+extern bool pgstat_backend_flush_cb(bool nowait, bool in_txn_only);
 extern void pgstat_backend_reset_timestamp_cb(PgStatShared_Common *header,
 											  TimestampTz ts);
 
@@ -729,7 +759,8 @@ extern void AtEOXact_PgStat_Database(bool isCommit, bool parallel);
 
 extern PgStat_StatDBEntry *pgstat_prep_database_pending(Oid dboid);
 extern void pgstat_reset_database_timestamp(Oid dboid, TimestampTz ts);
-extern bool pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait);
+extern bool pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait,
+									 bool in_txn_only, bool *is_partial);
 extern void pgstat_database_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts);
 
 
@@ -737,7 +768,8 @@ extern void pgstat_database_reset_timestamp_cb(PgStatShared_Common *header, Time
  * Functions in pgstat_function.c
  */
 
-extern bool pgstat_function_flush_cb(PgStat_EntryRef *entry_ref, bool nowait);
+extern bool pgstat_function_flush_cb(PgStat_EntryRef *entry_ref, bool nowait,
+									 bool in_txn_only, bool *is_partial);
 extern void pgstat_function_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts);
 
 
@@ -745,9 +777,9 @@ extern void pgstat_function_reset_timestamp_cb(PgStatShared_Common *header, Time
  * Functions in pgstat_io.c
  */
 
-extern void pgstat_flush_io(bool nowait);
+extern void pgstat_flush_io(bool nowait, bool in_txn_only);
 
-extern bool pgstat_io_flush_cb(bool nowait);
+extern bool pgstat_io_flush_cb(bool nowait, bool in_txn_only);
 extern void pgstat_io_init_shmem_cb(void *stats);
 extern void pgstat_io_reset_all_cb(TimestampTz ts);
 extern void pgstat_io_snapshot_cb(void);
@@ -762,7 +794,8 @@ extern void AtEOSubXact_PgStat_Relations(PgStat_SubXactStatus *xact_state, bool
 extern void AtPrepare_PgStat_Relations(PgStat_SubXactStatus *xact_state);
 extern void PostPrepare_PgStat_Relations(PgStat_SubXactStatus *xact_state);
 
-extern bool pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait);
+extern bool pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait,
+									 bool in_txn_only, bool *is_partial);
 extern void pgstat_relation_delete_pending_cb(PgStat_EntryRef *entry_ref);
 extern void pgstat_relation_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts);
 
@@ -809,7 +842,7 @@ extern PgStatShared_Common *pgstat_init_entry(PgStat_Kind kind,
  * Functions in pgstat_slru.c
  */
 
-extern bool pgstat_slru_flush_cb(bool nowait);
+extern bool pgstat_slru_flush_cb(bool nowait, bool in_txn_only);
 extern void pgstat_slru_init_shmem_cb(void *stats);
 extern void pgstat_slru_reset_all_cb(TimestampTz ts);
 extern void pgstat_slru_snapshot_cb(void);
@@ -820,7 +853,7 @@ extern void pgstat_slru_snapshot_cb(void);
  */
 
 extern void pgstat_wal_init_backend_cb(void);
-extern bool pgstat_wal_flush_cb(bool nowait);
+extern bool pgstat_wal_flush_cb(bool nowait, bool in_txn_only);
 extern void pgstat_wal_init_shmem_cb(void *stats);
 extern void pgstat_wal_reset_all_cb(TimestampTz ts);
 extern void pgstat_wal_snapshot_cb(void);
@@ -830,7 +863,8 @@ extern void pgstat_wal_snapshot_cb(void);
  * Functions in pgstat_subscription.c
  */
 
-extern bool pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait);
+extern bool pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait,
+										 bool in_txn_only, bool *is_partial);
 extern void pgstat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts);
 
 
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 0965b590b34..68fe814bde6 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -34,6 +34,7 @@ typedef enum TimeoutId
 	TRANSACTION_TIMEOUT,
 	IDLE_SESSION_TIMEOUT,
 	IDLE_STATS_UPDATE_TIMEOUT,
+	IN_TRANSACTION_STATS_UPDATE_TIMEOUT,
 	CLIENT_CONNECTION_CHECK_TIMEOUT,
 	STARTUP_PROGRESS_TIMEOUT,
 	/* First user-definable timeout reason */
diff --git a/src/test/modules/test_custom_stats/t/001_custom_stats.pl b/src/test/modules/test_custom_stats/t/001_custom_stats.pl
index 9e6a7a38577..d3afbff37c5 100644
--- a/src/test/modules/test_custom_stats/t/001_custom_stats.pl
+++ b/src/test/modules/test_custom_stats/t/001_custom_stats.pl
@@ -156,5 +156,64 @@ $result = $node->safe_psql('postgres',
 );
 is($result, "0", "report of fixed-sized after manual reset");
 
+# Test FLUSH_IN_TRANSACTION custom stats kind.
+# The intxn kind is registered alongside the regular kind by test_custom_var_stats.
+
+# Basic update and report.
+$node->safe_psql('postgres',
+	q(SELECT test_custom_var_intxn_update('intxn_entry1')));
+$node->safe_psql('postgres',
+	q(SELECT test_custom_var_intxn_update('intxn_entry1')));
+$node->safe_psql('postgres',
+	q(SELECT test_custom_var_intxn_update('intxn_entry1')));
+
+$result = $node->safe_psql('postgres',
+	q(SELECT test_custom_var_intxn_report('intxn_entry1')));
+is($result, "3", "intxn stats report after updates");
+
+# Verify intxn stats are not persisted (write_to_file = false).
+$node->stop();
+$node->start();
+
+$result = $node->safe_psql('postgres',
+	q(SELECT test_custom_var_intxn_report('intxn_entry1')));
+is($result, "", "intxn stats lost after clean restart (not persisted)");
+
+# Test in-transaction flushing with injection point.
+SKIP:
+{
+	skip "injection points not supported", 1
+		unless (($ENV{enable_injection_points} // '') eq 'yes'
+		&& $node->check_extension('injection_points'));
+
+	$node->safe_psql('postgres', 'CREATE EXTENSION IF NOT EXISTS injection_points;');
+	$node->safe_psql('postgres',
+		"SELECT injection_points_attach('in-transaction-stats-short-interval', 'error');");
+
+	$node->append_conf('postgresql.conf', 'stats_fetch_consistency = none');
+	$node->reload;
+
+	my $intxn_before = $node->safe_psql('postgres',
+		"SELECT COALESCE(test_custom_var_intxn_report('intxn_flush'), 0);");
+
+	$result = $node->safe_psql('postgres', q{
+BEGIN;
+SELECT test_custom_var_intxn_update('intxn_flush');
+SELECT test_custom_var_intxn_update('intxn_flush');
+SELECT test_custom_var_intxn_update('intxn_flush');
+SELECT pg_sleep(2);
+SELECT COALESCE(test_custom_var_intxn_report('intxn_flush'), 0);
+});
+
+	my @lines = split(/\n/, $result);
+	my $intxn_mid_txn = $lines[-1];
+
+	ok($intxn_mid_txn > $intxn_before,
+		"custom intxn stats flushed mid-transaction (before: $intxn_before, mid-txn: $intxn_mid_txn)");
+
+	$node->safe_psql('postgres',
+		"SELECT injection_points_detach('in-transaction-stats-short-interval');");
+}
+
 # Test completed successfully
 done_testing();
diff --git a/src/test/modules/test_custom_stats/test_custom_var_stats--1.0.sql b/src/test/modules/test_custom_stats/test_custom_var_stats--1.0.sql
index 5ed8cfc2dcf..03650cbc414 100644
--- a/src/test/modules/test_custom_stats/test_custom_var_stats--1.0.sql
+++ b/src/test/modules/test_custom_stats/test_custom_var_stats--1.0.sql
@@ -24,3 +24,13 @@ CREATE FUNCTION test_custom_stats_var_report(INOUT name TEXT,
 RETURNS SETOF record
 AS 'MODULE_PATHNAME', 'test_custom_stats_var_report'
 LANGUAGE C STRICT PARALLEL UNSAFE;
+
+CREATE FUNCTION test_custom_var_intxn_update(IN name TEXT)
+RETURNS void
+AS 'MODULE_PATHNAME', 'test_custom_var_intxn_update'
+LANGUAGE C STRICT PARALLEL UNSAFE;
+
+CREATE FUNCTION test_custom_var_intxn_report(IN name TEXT)
+RETURNS bigint
+AS 'MODULE_PATHNAME', 'test_custom_var_intxn_report'
+LANGUAGE C STRICT PARALLEL UNSAFE;
diff --git a/src/test/modules/test_custom_stats/test_custom_var_stats.c b/src/test/modules/test_custom_stats/test_custom_var_stats.c
index 2ef0e903745..e87d47a28ab 100644
--- a/src/test/modules/test_custom_stats/test_custom_var_stats.c
+++ b/src/test/modules/test_custom_stats/test_custom_var_stats.c
@@ -37,6 +37,11 @@ PG_MODULE_MAGIC_EXT(
  */
 #define PGSTAT_KIND_TEST_CUSTOM_VAR_STATS 25
 
+/*
+ * Kind ID for test_custom_var_intxn_stats statistics (FLUSH_IN_TRANSACTION).
+ */
+#define PGSTAT_KIND_TEST_CUSTOM_VAR_INTXN_STATS 27
+
 /* File paths for auxiliary data serialization */
 #define TEST_CUSTOM_AUX_DATA_DESC "pg_stat/test_custom_var_stats_desc.stats"
 
@@ -45,6 +50,8 @@ PG_MODULE_MAGIC_EXT(
  */
 #define PGSTAT_CUSTOM_VAR_STATS_IDX(name) hash_bytes_extended((const unsigned char *) name, strlen(name), 0)
 
+#define PGSTAT_CUSTOM_VAR_INTXN_IDX(name) hash_bytes_extended((const unsigned char *) name, strlen(name), 0)
+
 /*--------------------------------------------------------------------------
  * Type definitions
  *--------------------------------------------------------------------------
@@ -56,6 +63,12 @@ typedef struct PgStat_StatCustomVarEntry
 	PgStat_Counter numcalls;	/* times statistic was incremented */
 } PgStat_StatCustomVarEntry;
 
+/* Pending entry for FLUSH_IN_TRANSACTION kind (same layout) */
+typedef struct PgStat_CustomVarInTxnEntry
+{
+	PgStat_Counter numcalls;
+}			PgStat_CustomVarInTxnEntry;
+
 /* Shared memory statistics entry visible to all backends */
 typedef struct PgStatShared_CustomVarEntry
 {
@@ -64,6 +77,13 @@ typedef struct PgStatShared_CustomVarEntry
 	dsa_pointer description;	/* pointer to description string in DSA */
 } PgStatShared_CustomVarEntry;
 
+/* Shared memory entry for FLUSH_IN_TRANSACTION kind */
+typedef struct PgStatShared_CustomVarInTxnEntry
+{
+	PgStatShared_Common header;
+	PgStat_CustomVarInTxnEntry stats;
+}			PgStatShared_CustomVarInTxnEntry;
+
 /*--------------------------------------------------------------------------
  * Global Variables
  *--------------------------------------------------------------------------
@@ -85,7 +105,8 @@ static dsa_area *custom_stats_description_dsa = NULL;
 
 /* Flush callback: merge pending stats into shared memory */
 static bool test_custom_stats_var_flush_pending_cb(PgStat_EntryRef *entry_ref,
-												   bool nowait);
+												   bool nowait, bool in_txn_only,
+												   bool *is_partial);
 
 /* Serialization callback: write auxiliary entry data */
 static void test_custom_stats_var_to_serialized_data(const PgStat_HashKey *key,
@@ -100,6 +121,11 @@ static bool test_custom_stats_var_from_serialized_data(const PgStat_HashKey *key
 /* Finish callback: end of statistics file operations */
 static void test_custom_stats_var_finish(PgStat_StatsFileOp status);
 
+/* Flush callback for FLUSH_IN_TRANSACTION kind */
+static bool test_custom_var_intxn_flush_pending_cb(PgStat_EntryRef *entry_ref,
+												   bool nowait, bool in_txn_only,
+												   bool *is_partial);
+
 /*--------------------------------------------------------------------------
  * Custom kind configuration
  *--------------------------------------------------------------------------
@@ -121,6 +147,19 @@ static const PgStat_KindInfo custom_stats = {
 	.finish = test_custom_stats_var_finish,
 };
 
+static const PgStat_KindInfo custom_intxn_stats = {
+	.name = "test_custom_var_intxn_stats",
+	.fixed_amount = false,
+	.write_to_file = false,		/* no persistence needed for test */
+	.flush_mode = FLUSH_IN_TRANSACTION,
+	.accessed_across_databases = true,
+	.shared_size = sizeof(PgStatShared_CustomVarInTxnEntry),
+	.shared_data_off = offsetof(PgStatShared_CustomVarInTxnEntry, stats),
+	.shared_data_len = sizeof(PgStat_CustomVarInTxnEntry),
+	.pending_size = sizeof(PgStat_CustomVarInTxnEntry),
+	.flush_pending_cb = test_custom_var_intxn_flush_pending_cb,
+};
+
 /*--------------------------------------------------------------------------
  * Module initialization
  *--------------------------------------------------------------------------
@@ -133,8 +172,9 @@ _PG_init(void)
 	if (!process_shared_preload_libraries_in_progress)
 		return;
 
-	/* Register custom statistics kind */
+	/* Register custom statistics kinds */
 	pgstat_register_kind(PGSTAT_KIND_TEST_CUSTOM_VAR_STATS, &custom_stats);
+	pgstat_register_kind(PGSTAT_KIND_TEST_CUSTOM_VAR_INTXN_STATS, &custom_intxn_stats);
 }
 
 /*--------------------------------------------------------------------------
@@ -152,7 +192,8 @@ _PG_init(void)
  * Returns false only if nowait=true and lock acquisition fails.
  */
 static bool
-test_custom_stats_var_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait)
+test_custom_stats_var_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait,
+									   bool in_txn_only, bool *is_partial)
 {
 	PgStat_StatCustomVarEntry *pending_entry;
 	PgStatShared_CustomVarEntry *shared_entry;
@@ -160,6 +201,9 @@ test_custom_stats_var_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	pending_entry = (PgStat_StatCustomVarEntry *) entry_ref->pending;
 	shared_entry = (PgStatShared_CustomVarEntry *) entry_ref->shared_stats;
 
+	/* this is not a partial flush */
+	*is_partial = false;
+
 	if (!pgstat_lock_entry(entry_ref, nowait))
 		return false;
 
@@ -691,3 +735,67 @@ test_custom_stats_var_report(PG_FUNCTION_ARGS)
 
 	SRF_RETURN_DONE(funcctx);
 }
+
+/*--------------------------------------------------------------------------
+ * FLUSH_IN_TRANSACTION kind: callbacks and SQL functions
+ *--------------------------------------------------------------------------
+ */
+
+static bool
+test_custom_var_intxn_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait,
+									   bool in_txn_only, bool *is_partial)
+{
+	PgStat_CustomVarInTxnEntry *pending;
+	PgStatShared_CustomVarInTxnEntry *shared;
+
+	pending = (PgStat_CustomVarInTxnEntry *) entry_ref->pending;
+	shared = (PgStatShared_CustomVarInTxnEntry *) entry_ref->shared_stats;
+
+	*is_partial = false;
+
+	if (!pgstat_lock_entry(entry_ref, nowait))
+		return false;
+
+	shared->stats.numcalls += pending->numcalls;
+
+	pgstat_unlock_entry(entry_ref);
+
+	return true;
+}
+
+PG_FUNCTION_INFO_V1(test_custom_var_intxn_update);
+Datum
+test_custom_var_intxn_update(PG_FUNCTION_ARGS)
+{
+	char	   *stat_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	PgStat_EntryRef *entry_ref;
+	PgStat_CustomVarInTxnEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_TEST_CUSTOM_VAR_INTXN_STATS,
+										  InvalidOid,
+										  PGSTAT_CUSTOM_VAR_INTXN_IDX(stat_name),
+										  NULL);
+
+	pending = (PgStat_CustomVarInTxnEntry *) entry_ref->pending;
+	pending->numcalls++;
+
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(test_custom_var_intxn_report);
+Datum
+test_custom_var_intxn_report(PG_FUNCTION_ARGS)
+{
+	char	   *stat_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	PgStat_CustomVarInTxnEntry *stats;
+
+	stats = (PgStat_CustomVarInTxnEntry *)
+		pgstat_fetch_entry(PGSTAT_KIND_TEST_CUSTOM_VAR_INTXN_STATS,
+						   InvalidOid,
+						   PGSTAT_CUSTOM_VAR_INTXN_IDX(stat_name));
+
+	if (!stats)
+		PG_RETURN_NULL();
+
+	PG_RETURN_INT64(stats->numcalls);
+}
diff --git a/src/test/modules/test_misc/meson.build b/src/test/modules/test_misc/meson.build
index 6e8db1621a7..70248360d21 100644
--- a/src/test/modules/test_misc/meson.build
+++ b/src/test/modules/test_misc/meson.build
@@ -19,6 +19,7 @@ tests += {
       't/008_replslot_single_user.pl',
       't/009_log_temp_files.pl',
       't/010_index_concurrently_upsert.pl',
+      't/011_in_transaction_stats.pl',
     ],
     # The injection points are cluster-wide, so disable installcheck
     'runningcheck': false,
diff --git a/src/test/modules/test_misc/t/011_in_transaction_stats.pl b/src/test/modules/test_misc/t/011_in_transaction_stats.pl
new file mode 100644
index 00000000000..cf1bde24a5d
--- /dev/null
+++ b/src/test/modules/test_misc/t/011_in_transaction_stats.pl
@@ -0,0 +1,91 @@
+
+# Copyright (c) 2024-2026, PostgreSQL Global Development Group
+
+# Test in-transaction stats flushing mechanism.
+#
+# This test verifies that FLUSH_IN_TRANSACTION stats are periodically flushed
+# to shared memory while a transaction is idle between commands.  Uses an
+# injection point to reduce the flush interval from 10 seconds to 1 second
+# for faster testing.
+#
+# We test one representative of each stats kind: relation stats (seq_scan)
+# for variable-numbered stats, and WAL for fixed-sized stats.
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init();
+$node->append_conf('postgresql.conf', 'stats_fetch_consistency = none');
+$node->start;
+
+if (!$node->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+$node->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Attach injection point to reduce the in-transaction stats flush interval
+# to 1 second.
+$node->safe_psql('postgres',
+	"SELECT injection_points_attach('in-transaction-stats-short-interval', 'error');");
+
+# Create test table
+$node->safe_psql('postgres',
+	'CREATE TABLE test_in_txn_stats(a int) WITH (autovacuum_enabled = off);');
+$node->safe_psql('postgres',
+	'INSERT INTO test_in_txn_stats SELECT generate_series(1, 1000);');
+
+# Force flush and get baseline stats
+$node->safe_psql('postgres', 'SELECT pg_stat_force_next_flush();');
+my $seq_scan_before = $node->safe_psql('postgres',
+	"SELECT seq_scan FROM pg_stat_user_tables WHERE relname = 'test_in_txn_stats';");
+
+# Test that seq_scan stats (variable-numbered) are flushed mid-transaction
+# via the in-transaction periodic flush mechanism.
+my $result = $node->safe_psql('postgres', q{
+BEGIN;
+SELECT COUNT(*) FROM test_in_txn_stats;
+SELECT COUNT(*) FROM test_in_txn_stats;
+SELECT pg_sleep(2);
+SELECT seq_scan FROM pg_stat_user_tables WHERE relname = 'test_in_txn_stats';
+});
+
+my @lines = split(/\n/, $result);
+my $seq_scan_mid_txn = $lines[-1];
+
+ok($seq_scan_mid_txn > $seq_scan_before,
+	"seq_scan stats flushed during transaction (before: $seq_scan_before, mid-txn: $seq_scan_mid_txn)");
+
+# Test WAL stats (fixed-sized) flushing during transaction.
+$node->safe_psql('postgres', 'SELECT pg_stat_reset_shared(\'wal\');');
+my $wal_records_before = $node->safe_psql('postgres',
+	"SELECT wal_records FROM pg_stat_wal;");
+
+$result = $node->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_in_txn_stats SELECT generate_series(1, 1000);
+SELECT pg_sleep(2);
+SELECT wal_records FROM pg_stat_wal;
+});
+
+@lines = split(/\n/, $result);
+my $wal_records_mid_txn = $lines[-1];
+
+ok($wal_records_mid_txn > $wal_records_before,
+	"WAL stats flushed during transaction (before: $wal_records_before, mid-txn: $wal_records_mid_txn)");
+
+# Cleanup
+$node->safe_psql('postgres', 'DROP TABLE test_in_txn_stats;');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 52f8603a7be..c9cd6087abc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2290,6 +2290,7 @@ PgStat_Counter
 PgStat_EntryRef
 PgStat_EntryRefHashEntry
 PgStat_FetchConsistency
+PgStat_FlushMode
 PgStat_FunctionCallUsage
 PgStat_FunctionCounts
 PgStat_HashKey
-- 
2.47.3

