Hi,

Here's a patch adding prefetch/read-ahead info about IO to EXPLAIN. This
was initially developed as part of the index prefetch patches, but it
became clear it can be useful for other nodes using ReadStream. That
includes seqscan and bitmap heap scans, and (hopefully) will include
index scan in PG19.

The 0001 patch is "prerequisite" adjusting WaitReadBuffers to indicate
if the buffer read had to wait for the I/O. 0002 is the part this thread
is really about.


In EXPLAIN, this new info is enabled by a new option "IO", which adds
two new lines to the output:

--------------------
EXPLAIN (ANALYZE, COSTS off, TIMING off, IO)
 SELECT * FROM t WHERE a < 100000;

                   QUERY PLAN
-------------------------------------------------
 Seq Scan on t (actual rows=999996.00 loops=1)
   Filter: (a < 100000)
   Rows Removed by Filter: 4
   Prefetch: avg=262.629 max=271 capacity=272
   I/O: stalls=653 size=15.983 inprogress=15.934
   Buffers: shared read=55556
 Planning:
   Buffers: shared hit=50 read=23
 Planning Time: 7.358 ms
 Execution Time: 358.214 ms
(10 rows)
--------------------

The first line "Prefetch" tracks the look-ahead distance, i.e. how many
blocks ahead the ReadStream is requesting.

The second line "I/O" is about the I/O requests actually issued - how
many times we had to wait for the block (when we get to process it),
average size of a request (in BLCKSZ blocks), and average number of
in-progress requests.

The above example shows we've been reading ~262 blocks ahead average,
with theoretical maximum of 272 (and at some point we read 271 blocks).
The average is pretty close to the maximum, so we've been reading ~2MB
ahead, which seems pretty good.

The I/O stats says we've been reading in 16-block requests, so 128kB
chunks (which aligns with io_combine_limit=16). And when issuing a new
I/O, there were ~16 requests in-progress already (which aligns with the
maximum distance, because 16*17=272).


We could track much more information. The WIP patches tracked histograms
of various metrics (distances, sizes, ...), and it was quite handy
during development. For regular EXPLAIN we chose to select only the most
useful subset, to make it meaningful and also cheap to collect. And also
generic enough to work for other table AM implementations.

Regarding the basic design, and a couple topics for review questions:


1) The information is collected by ReadStream, unconditionally.

I experimented with having a flag to "request" collecting these stats
(so that it can be done only for EXPLAIN ANALYZE), but that turned out
pretty useless. The stats are super cheap, so the extra flag did not
make a meaningful difference. And passing the flag to the scan/stream
was way too invasive. So all ReadStreams track it.

There's a ReadStreamInstrumentation defined in instrumet_node.h, but I'm
not convinced that's the right place. Maybe it should be defined in
read_stream.h instead.


2) This is inherently a TAM implementation detail.

The ReadStream is "inside" the TAM, and some TAMs may not even use a
ReadStream to do I/O. So EXPLAIN can't just inspect the stream directly,
that'd violate the layering. It needs to do it through the TAM API.

So this introduces a new TAM callback "scan_stats" which gets a scan
descriptor, and extracts stats from the stream (if any). The index
prefetching will need a second callback for IndexScanDesc.

It also introduces a "generic" TableScanStatsData struct, so that it
does not work with ReadStreamInstrumentation (because what if the TAM
does not use a ReadStream). Of course, for heap AM it's 1:1.

For heap AM the callback is pretty simple, it just collects the read
stream stats and "translates" it to the TableScanStats instance.


3) Adds shared instrumentation to SeqScan workers.

The patch also had to improve a parallel SeqScan to collect per-worker
instrumentation, similarly to a BitmapHeapScan. Until now this was not
needed, but it's needed for tracking per-worker prefetch stats.


Here's a couple questions I'm asking myself about this patch:

- Is the selected information really the minimal meaningful set of stats
we could track? Is it sufficiently clear what we're tracking, and/or
should we track something else?

- Is the TableScanStatsData generic enough? Can other TAMs (not using
ReadStream) use this to show meaningful stats?

- If the separation between TAM and the low-level instrumentation clear
enough? Or is the ReadStreamInstrumentation "leaking" somewhere? For
example, is it OK it's in SeqScanInstrumentation?

But I'm sure there are other questions I haven't thought of.


regards

-- 
Tomas Vondra
From 93df10e306368b02ccb00a160ab6adee8317c3c1 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 3 Mar 2026 16:50:50 -0500
Subject: [PATCH v1 1/2] bufmgr: Return whether WaitReadBuffers() needed to
 wait

In a subsequent commit read_stream.c will use this as an input to the read
ahead distance.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/buffer/bufmgr.c | 18 +++++++++++++++++-
 src/include/storage/bufmgr.h        |  2 +-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 00bc609529a..c8ec7fb4f53 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1738,12 +1738,20 @@ ProcessReadBuffersResult(ReadBuffersOperation *operation)
 	Assert(operation->nblocks_done <= operation->nblocks);
 }
 
-void
+/*
+ * Wait for the IO operation initiated by StartReadBuffers() et al to
+ * complete.
+ *
+ * Returns whether the IO operation already had completed by the time of this
+ * call.
+ */
+bool
 WaitReadBuffers(ReadBuffersOperation *operation)
 {
 	PgAioReturn *aio_ret = &operation->io_return;
 	IOContext	io_context;
 	IOObject	io_object;
+	bool		needed_wait = false;
 
 	if (operation->persistence == RELPERSISTENCE_TEMP)
 	{
@@ -1805,6 +1813,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
 
 				pgaio_wref_wait(&operation->io_wref);
+				needed_wait = true;
 
 				/*
 				 * The IO operation itself was already counted earlier, in
@@ -1835,6 +1844,12 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 
 		CHECK_FOR_INTERRUPTS();
 
+		/*
+		 * If the IO completed only partially, we need to perform additional
+		 * work, consider that a form of having had to wait.
+		 */
+		needed_wait = true;
+
 		/*
 		 * This may only complete the IO partially, either because some
 		 * buffers were already valid, or because of a partial read.
@@ -1851,6 +1866,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	CheckReadBuffersOperation(operation, true);
 
 	/* NB: READ_DONE tracepoint was already executed in completion callback */
+	return needed_wait;
 }
 
 /*
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 4017896f951..a1ef04354dd 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -249,7 +249,7 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation,
 							 BlockNumber blockNum,
 							 int *nblocks,
 							 int flags);
-extern void WaitReadBuffers(ReadBuffersOperation *operation);
+extern bool WaitReadBuffers(ReadBuffersOperation *operation);
 
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
-- 
2.53.0

From 8b522d83b9eea412a4d01de2fea071926482b333 Mon Sep 17 00:00:00 2001
From: test <test>
Date: Thu, 12 Mar 2026 02:04:07 +0100
Subject: [PATCH v1 2/2] explain: show prefetch stats in EXPLAIN (ANALYZE,
 VERBOSE)

This adds details about AIO / prefetch for a number of executor nodes
using the ReadStream, notably:

- SeqScan
- BitmapHeapScan

The statistics is tracked by the ReadStream, and then propagated up
through the table AM interface.

The ReadStream tracks the statistics unconditionally, i.e. even outside
EXPLAIN ANALYZE etc. The amount of statistics is trivial (a handful of
integer counters), it's not worth gating this by a flag.

The TAM gets one new callback "scan_stats", to collect stats from the
scan (which fetch tuples from the TAM). There is also a new struct
TableScanStatsData/TableScanStats to separate the statistics from the
actual TAM implementation.

This required improving SeqScan to properly setup shared instrumentation
between the leader and workers, collect it at the end of a scan, etc.
---
 doc/src/sgml/ref/explain.sgml             |  12 +
 src/backend/access/heap/heapam.c          |  32 +++
 src/backend/access/heap/heapam_handler.c  |   1 +
 src/backend/commands/explain.c            | 262 ++++++++++++++++++++++
 src/backend/commands/explain_state.c      |  10 +
 src/backend/executor/execParallel.c       |   3 +
 src/backend/executor/nodeBitmapHeapscan.c |  16 ++
 src/backend/executor/nodeSeqscan.c        | 123 +++++++++-
 src/backend/storage/aio/read_stream.c     |  64 +++++-
 src/include/access/heapam.h               |   1 +
 src/include/access/relscan.h              |  27 +++
 src/include/access/tableam.h              |  17 ++
 src/include/commands/explain_state.h      |   1 +
 src/include/executor/instrument_node.h    |  45 +++-
 src/include/executor/nodeSeqscan.h        |   1 +
 src/include/nodes/execnodes.h             |   2 +
 src/include/storage/read_stream.h         |   3 +
 17 files changed, 615 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/ref/explain.sgml b/doc/src/sgml/ref/explain.sgml
index 7dee77fd366..9da16c77d73 100644
--- a/doc/src/sgml/ref/explain.sgml
+++ b/doc/src/sgml/ref/explain.sgml
@@ -46,6 +46,7 @@ EXPLAIN [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] <rep
     TIMING [ <replaceable class="parameter">boolean</replaceable> ]
     SUMMARY [ <replaceable class="parameter">boolean</replaceable> ]
     MEMORY [ <replaceable class="parameter">boolean</replaceable> ]
+    IO [ <replaceable class="parameter">boolean</replaceable> ]
     FORMAT { TEXT | XML | JSON | YAML }
 </synopsis>
  </refsynopsisdiv>
@@ -295,6 +296,17 @@ ROLLBACK;
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>IO</literal></term>
+    <listitem>
+     <para>
+      Include information on I/O performed by each node.
+      This parameter may only be used when <literal>ANALYZE</literal> is also
+      enabled.  It defaults to <literal>FALSE</literal>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>FORMAT</literal></term>
     <listitem>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8f1c11a9350..91ed8ce97c3 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1406,6 +1406,38 @@ heap_endscan(TableScanDesc sscan)
 	pfree(scan);
 }
 
+/*
+ * heap_scan_stats
+ *		return stats collected by the read stream
+ *
+ * Returns NULL if the scan is not using a read stream.
+ */
+TableScanStats
+heap_scan_stats(TableScanDesc sscan)
+{
+	HeapScanDesc scan = (HeapScanDesc) sscan;
+	ReadStreamInstrumentation stats;
+	TableScanStats res;
+
+	if (!scan->rs_read_stream)
+		return NULL;
+
+	stats = read_stream_prefetch_stats(scan->rs_read_stream);
+
+	res = palloc0(sizeof(TableScanStatsData));
+
+	res->prefetch_count = stats.prefetch_count;
+	res->distance_sum = stats.distance_sum;
+	res->distance_max = stats.distance_max;
+	res->distance_capacity = stats.distance_capacity;
+	res->stall_count = stats.stall_count;
+	res->io_count = stats.io_count;
+	res->io_nblocks = stats.io_nblocks;
+	res->io_in_progress = stats.io_in_progress;
+
+	return res;
+}
+
 HeapTuple
 heap_getnext(TableScanDesc sscan, ScanDirection direction)
 {
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 2d1ee9ac95d..1bf14b428e1 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2641,6 +2641,7 @@ static const TableAmRoutine heapam_methods = {
 
 	.scan_set_tidrange = heap_set_tidrange,
 	.scan_getnextslot_tidrange = heap_getnextslot_tidrange,
+	.scan_stats = heap_scan_stats,
 
 	.parallelscan_estimate = table_block_parallelscan_estimate,
 	.parallelscan_initialize = table_block_parallelscan_initialize,
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 93918a223b8..8a86390c45b 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -13,6 +13,8 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
+#include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/pg_type.h"
 #include "commands/createas.h"
@@ -138,6 +140,11 @@ static void show_hashagg_info(AggState *aggstate, ExplainState *es);
 static void show_indexsearches_info(PlanState *planstate, ExplainState *es);
 static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 								ExplainState *es);
+static void show_scan_io_info(ScanState *planstate,
+							  ExplainState *es);
+static void show_worker_io_info(PlanState *planstate,
+								ExplainState *es,
+								int worker);
 static void show_instrumentation_count(const char *qlabel, int which,
 									   PlanState *planstate, ExplainState *es);
 static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es);
@@ -2007,6 +2014,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_instrumentation_count("Rows Removed by Filter", 1,
 										   planstate, es);
 			show_tidbitmap_info((BitmapHeapScanState *) planstate, es);
+			show_scan_io_info((ScanState *) planstate, es);
 			break;
 		case T_SampleScan:
 			show_tablesample(((SampleScan *) plan)->tablesample,
@@ -2025,6 +2033,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 										   planstate, es);
 			if (IsA(plan, CteScan))
 				show_ctescan_info(castNode(CteScanState, planstate), es);
+			show_scan_io_info((ScanState *) planstate, es);
 			break;
 		case T_Gather:
 			{
@@ -2313,6 +2322,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_buffer_usage(es, &instrument->bufusage);
 			if (es->wal)
 				show_wal_usage(es, &instrument->walusage);
+
+			/* show prefetch info for the given worker */
+			show_worker_io_info(planstate, es, n);
+
 			ExplainCloseWorker(n, es);
 		}
 	}
@@ -3983,6 +3996,255 @@ show_tidbitmap_info(BitmapHeapScanState *planstate, ExplainState *es)
 	}
 }
 
+/*
+ * show_scan_io_info
+ *		show info about prefetching for a seq/bitmap scan
+ *
+ * Shows summary of stats for leader and workers (if any).
+ */
+static void
+show_scan_io_info(ScanState *planstate, ExplainState *es)
+{
+	Plan	   *plan = planstate->ps.plan;
+	TableScanStats	leader_stats;
+	TableScanStatsData	stats;
+
+	if (!es->io)
+		return;
+
+	/* scan not started, no prefetch stats */
+	if (!(planstate && planstate->ss_currentScanDesc))
+		return;
+
+	/* collect prefetch statistics from the read stream */
+	leader_stats = table_scan_stats(planstate->ss_currentScanDesc);
+
+	if (leader_stats)
+	{
+		memcpy(&stats, leader_stats, sizeof(TableScanStatsData));
+	}
+	else
+	{
+		memset(&stats, 0, sizeof(TableScanStatsData));
+	}
+
+	/* Initialize counters with stats from the local process first */
+	switch (nodeTag(plan))
+	{
+		case T_SeqScan:
+			{
+				SharedSeqScanInstrumentation *sinstrument
+					= ((SeqScanState *) planstate)->sinstrument;
+
+				/* get the sum of the counters set within each and every process */
+				if (sinstrument)
+				{
+					for (int i = 0; i < sinstrument->num_workers; ++i)
+					{
+						SeqScanInstrumentation *winstrument = &sinstrument->sinstrument[i];
+
+						stats.prefetch_count += winstrument->stream.prefetch_count;
+						stats.distance_sum += winstrument->stream.distance_sum;
+						if (winstrument->stream.distance_max > stats.distance_max)
+							stats.distance_max = winstrument->stream.distance_max;
+						if (winstrument->stream.distance_capacity > stats.distance_capacity)
+							stats.distance_capacity = winstrument->stream.distance_capacity;
+						stats.stall_count += winstrument->stream.stall_count;
+						stats.io_count += winstrument->stream.io_count;
+						stats.io_nblocks += winstrument->stream.io_nblocks;
+						stats.io_in_progress += winstrument->stream.io_in_progress;
+					}
+				}
+
+				break;
+			}
+		case T_BitmapHeapScan:
+			{
+				SharedBitmapHeapInstrumentation *sinstrument
+					= ((BitmapHeapScanState *) planstate)->sinstrument;
+
+				/* get the sum of the counters set within each and every process */
+				if (sinstrument)
+				{
+					for (int i = 0; i < sinstrument->num_workers; ++i)
+					{
+						BitmapHeapScanInstrumentation *winstrument = &sinstrument->sinstrument[i];
+
+						stats.prefetch_count += winstrument->stream.prefetch_count;
+						stats.distance_sum += winstrument->stream.distance_sum;
+						if (winstrument->stream.distance_max > stats.distance_max)
+							stats.distance_max = winstrument->stream.distance_max;
+						if (winstrument->stream.distance_capacity > stats.distance_capacity)
+							stats.distance_capacity = winstrument->stream.distance_capacity;
+						stats.stall_count += winstrument->stream.stall_count;
+						stats.io_count += winstrument->stream.io_count;
+						stats.io_nblocks += winstrument->stream.io_nblocks;
+						stats.io_in_progress += winstrument->stream.io_in_progress;
+					}
+				}
+
+				break;
+			}
+		default:
+			/* ignore other plans */
+			return;
+	}
+
+	/* don't print anything without prefetching */
+	if (stats.prefetch_count > 0)
+	{
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			/* prefetch distance info */
+			ExplainIndentText(es);
+			appendStringInfo(es->str, "Prefetch: avg=%.3f max=%d capacity=%d",
+							 (stats.distance_sum * 1.0 / stats.prefetch_count),
+							 stats.distance_max,
+							 stats.distance_capacity);
+			appendStringInfoChar(es->str, '\n');
+
+			/* prefetch I/O info (only if there were actual I/Os) */
+			if (stats.stall_count > 0 || stats.io_count > 0)
+			{
+				ExplainIndentText(es);
+				appendStringInfo(es->str, "I/O: stalls=%" PRIu64,
+								 stats.stall_count);
+
+				if (stats.io_count > 0)
+				{
+					appendStringInfo(es->str, " size=%.3f inprogress=%.3f",
+									 (stats.io_nblocks * 1.0 / stats.io_count),
+									 (stats.io_in_progress * 1.0 / stats.io_count));
+				}
+
+				appendStringInfoChar(es->str, '\n');
+			}
+		}
+		else
+		{
+			ExplainOpenGroup("Prefetch", "I/O", true, es);
+
+			ExplainPropertyFloat("Average Distance", NULL,
+								 (stats.distance_sum * 1.0 / stats.prefetch_count), 3, es);
+			ExplainPropertyInteger("Max Distance", NULL,
+								   stats.distance_max, es);
+			ExplainPropertyInteger("Capacity", NULL,
+								   stats.distance_capacity, es);
+			ExplainPropertyUInteger("Stalls", NULL,
+									stats.stall_count, es);
+
+			if (stats.io_count > 0)
+			{
+				ExplainPropertyFloat("Average IO Size", NULL,
+									 (stats.io_nblocks * 1.0 / stats.io_count), 3, es);
+				ExplainPropertyFloat("Average IOs In Progress", NULL,
+									 (stats.io_in_progress * 1.0 / stats.io_count), 3, es);
+			}
+
+			ExplainCloseGroup("Prefetch", "I/O", true, es);
+		}
+	}
+}
+
+/*
+ * show_io_worker_info
+ *		show info about prefetching for a single worker
+ *
+ * Shows prefetching stats for a parallel scan worker.
+ */
+static void
+show_worker_io_info(PlanState *planstate, ExplainState *es, int worker)
+{
+	Plan	   *plan = planstate->plan;
+	ReadStreamInstrumentation *stats = NULL;
+
+	if (!es->io)
+		return;
+
+	/* get instrumentation for the given worker */
+	switch (nodeTag(plan))
+	{
+		case T_BitmapHeapScan:
+			{
+				BitmapHeapScanState *state = ((BitmapHeapScanState *) planstate);
+				SharedBitmapHeapInstrumentation *sinstrument = state->sinstrument;
+				BitmapHeapScanInstrumentation *instrument = &sinstrument->sinstrument[worker];
+
+				stats = &instrument->stream;
+
+				break;
+			}
+		case T_SeqScan:
+			{
+				SeqScanState *state = ((SeqScanState *) planstate);
+				SharedSeqScanInstrumentation *sinstrument = state->sinstrument;
+				SeqScanInstrumentation *instrument = &sinstrument->sinstrument[worker];
+
+				stats = &instrument->stream;
+
+				break;
+			}
+		default:
+			/* ignore other plans */
+			return;
+	}
+
+	/* don't print stats if there's nothing to report */
+	if (stats->prefetch_count > 0)
+	{
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			/* prefetch distance info */
+			ExplainIndentText(es);
+			appendStringInfo(es->str, "Prefetch: avg=%.3f max=%d capacity=%d",
+							 (stats->distance_sum * 1.0 / stats->prefetch_count),
+							 stats->distance_max,
+							 stats->distance_capacity);
+			appendStringInfoChar(es->str, '\n');
+
+			/* prefetch I/O info (only if there were actual I/Os) */
+			if (stats->stall_count > 0 || stats->io_count > 0)
+			{
+				ExplainIndentText(es);
+				appendStringInfo(es->str, "I/O: stalls=%" PRIu64,
+								 stats->stall_count);
+
+				if (stats->io_count > 0)
+				{
+					appendStringInfo(es->str, " size=%.3f inprogress=%.3f",
+									 (stats->io_nblocks * 1.0 / stats->io_count),
+									 (stats->io_in_progress * 1.0 / stats->io_count));
+				}
+
+				appendStringInfoChar(es->str, '\n');
+			}
+		}
+		else
+		{
+			ExplainOpenGroup("Prefetch", "I/O", true, es);
+
+			ExplainPropertyFloat("Average Distance", NULL,
+								 (stats->distance_sum * 1.0 / stats->prefetch_count), 3, es);
+			ExplainPropertyInteger("Max Distance", NULL,
+								   stats->distance_max, es);
+			ExplainPropertyInteger("Capacity", NULL,
+								   stats->distance_capacity, es);
+			ExplainPropertyUInteger("Stalls", NULL,
+									stats->stall_count, es);
+
+			if (stats->io_count > 0)
+			{
+				ExplainPropertyFloat("Average IO Size", NULL,
+									 (stats->io_nblocks * 1.0 / stats->io_count), 3, es);
+				ExplainPropertyFloat("Average IOs In Progress", NULL,
+									 (stats->io_in_progress * 1.0 / stats->io_count), 3, es);
+			}
+
+			ExplainCloseGroup("Prefetch", "I/O", true, es);
+		}
+	}
+}
+
 /*
  * If it's EXPLAIN ANALYZE, show instrumentation information for a plan node
  *
diff --git a/src/backend/commands/explain_state.c b/src/backend/commands/explain_state.c
index 77f59b8e500..f5cbdd21aaa 100644
--- a/src/backend/commands/explain_state.c
+++ b/src/backend/commands/explain_state.c
@@ -159,6 +159,10 @@ ParseExplainOptionList(ExplainState *es, List *options, ParseState *pstate)
 								"EXPLAIN", opt->defname, p),
 						 parser_errposition(pstate, opt->location)));
 		}
+		else if (strcmp(opt->defname, "io") == 0)
+		{
+			es->io = defGetBoolean(opt);
+		}
 		else if (!ApplyExtensionExplainOption(es, opt, pstate))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -185,6 +189,12 @@ ParseExplainOptionList(ExplainState *es, List *options, ParseState *pstate)
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("EXPLAIN option %s requires ANALYZE", "TIMING")));
 
+	/* check that IO is used with EXPLAIN ANALYZE */
+	if (es->io && !es->analyze)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("EXPLAIN option %s requires ANALYZE", "IO")));
+
 	/* check that serialize is used with EXPLAIN ANALYZE */
 	if (es->serialize != EXPLAIN_SERIALIZE_NONE && !es->analyze)
 		ereport(ERROR,
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ac84af294c9..ffc708ed6be 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -1119,6 +1119,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 		case T_BitmapHeapScanState:
 			ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
 			break;
+		case T_SeqScanState:
+			ExecSeqScanRetrieveInstrumentation((SeqScanState *) planstate);
+			break;
 		default:
 			break;
 	}
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 74eac93284e..171d85a45c2 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -275,6 +275,7 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
 	if (node->sinstrument != NULL && IsParallelWorker())
 	{
 		BitmapHeapScanInstrumentation *si;
+		TableScanStats stats;
 
 		Assert(ParallelWorkerNumber < node->sinstrument->num_workers);
 		si = &node->sinstrument->sinstrument[ParallelWorkerNumber];
@@ -288,6 +289,21 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
 		 */
 		si->exact_pages += node->stats.exact_pages;
 		si->lossy_pages += node->stats.lossy_pages;
+
+		/* collect prefetch info for this process from the read_stream */
+		if ((stats = table_scan_stats(node->ss.ss_currentScanDesc)) != NULL)
+		{
+			si->stream.prefetch_count += stats->prefetch_count;
+			si->stream.distance_sum += stats->distance_sum;
+			if (stats->distance_max > si->stream.distance_max)
+				si->stream.distance_max = stats->distance_max;
+			if (stats->distance_capacity > si->stream.distance_capacity)
+				si->stream.distance_capacity = stats->distance_capacity;
+			si->stream.stall_count += stats->stall_count;
+			si->stream.io_count += stats->io_count;
+			si->stream.io_nblocks += stats->io_nblocks;
+			si->stream.io_in_progress += stats->io_in_progress;
+		}
 	}
 
 	/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index af3c788ce8b..5903f9b1b3c 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -294,6 +294,43 @@ ExecEndSeqScan(SeqScanState *node)
 {
 	TableScanDesc scanDesc;
 
+	/*
+	 * When ending a parallel worker, copy the statistics gathered by the
+	 * worker back into shared memory so that it can be picked up by the main
+	 * process to report in EXPLAIN ANALYZE.
+	 */
+	if (node->sinstrument != NULL && IsParallelWorker())
+	{
+		SeqScanInstrumentation *si;
+		TableScanStats stats;
+
+		Assert(ParallelWorkerNumber < node->sinstrument->num_workers);
+		si = &node->sinstrument->sinstrument[ParallelWorkerNumber];
+
+		/*
+		 * Here we accumulate the stats rather than performing memcpy on
+		 * node->stats into si.  When a Gather/GatherMerge node finishes it
+		 * will perform planner shutdown on the workers.  On rescan it will
+		 * spin up new workers which will have a new SeqScanState and
+		 * zeroed stats.
+		 */
+
+		/* collect prefetch info for this process from the read_stream */
+		if ((stats = table_scan_stats(node->ss.ss_currentScanDesc)) != NULL)
+		{
+			si->stream.prefetch_count += stats->prefetch_count;
+			si->stream.distance_sum += stats->distance_sum;
+			if (stats->distance_max > si->stream.distance_max)
+				si->stream.distance_max = stats->distance_max;
+			if (stats->distance_capacity > si->stream.distance_capacity)
+				si->stream.distance_capacity = stats->distance_capacity;
+			si->stream.stall_count += stats->stall_count;
+			si->stream.io_count += stats->io_count;
+			si->stream.io_nblocks += stats->io_nblocks;
+			si->stream.io_in_progress += stats->io_in_progress;
+		}
+	}
+
 	/*
 	 * get information from node
 	 */
@@ -348,10 +385,23 @@ ExecSeqScanEstimate(SeqScanState *node,
 					ParallelContext *pcxt)
 {
 	EState	   *estate = node->ss.ps.state;
+	Size		size;
 
-	node->pscan_len = table_parallelscan_estimate(node->ss.ss_currentRelation,
+	size = table_parallelscan_estimate(node->ss.ss_currentRelation,
 												  estate->es_snapshot);
-	shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+	node->pscan_len = size;
+
+	/* make sure the instrumentation is properly aligned */
+	size = MAXALIGN(size);
+
+	/* account for instrumentation, if required */
+	if (node->ss.ps.instrument && pcxt->nworkers > 0)
+	{
+		size = add_size(size, offsetof(SharedSeqScanInstrumentation, sinstrument));
+		size = add_size(size, mul_size(pcxt->nworkers, sizeof(SeqScanInstrumentation)));
+	}
+
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 }
 
@@ -367,14 +417,42 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 {
 	EState	   *estate = node->ss.ps.state;
 	ParallelTableScanDesc pscan;
+	SharedSeqScanInstrumentation *sinstrument = NULL;
+	Size		size;
+	char	   *ptr;
+
+	/* Recalculate the size. This needs to match ExecSeqScanEstimate. */
+	size = MAXALIGN(node->pscan_len);
+	if (node->ss.ps.instrument && pcxt->nworkers > 0)
+	{
+		size = add_size(size, offsetof(SharedSeqScanInstrumentation, sinstrument));
+		size = add_size(size, mul_size(pcxt->nworkers, sizeof(SeqScanInstrumentation)));
+	}
 
-	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
+	pscan = shm_toc_allocate(pcxt->toc, size);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
 								  estate->es_snapshot);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+
+	/* initialize the shared instrumentation (with correct alignment) */
+	ptr = (char *) pscan;
+	ptr += MAXALIGN(node->pscan_len);
+	if (node->ss.ps.instrument && pcxt->nworkers > 0)
+		sinstrument = (SharedSeqScanInstrumentation *) ptr;
+
+	if (sinstrument)
+	{
+		sinstrument->num_workers = pcxt->nworkers;
+
+		/* ensure any unfilled slots will contain zeroes */
+		memset(sinstrument->sinstrument, 0,
+			   pcxt->nworkers * sizeof(SeqScanInstrumentation));
+	}
+
+	node->sinstrument = sinstrument;
 }
 
 /* ----------------------------------------------------------------
@@ -403,9 +481,48 @@ void
 ExecSeqScanInitializeWorker(SeqScanState *node,
 							ParallelWorkerContext *pwcxt)
 {
+	EState	   *estate = node->ss.ps.state;
 	ParallelTableScanDesc pscan;
+	char	   *ptr;
+	Size		size;
 
 	pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+
+	/*
+	 * Workers don't get the pscan_len value in scan descriptor, so use the
+	 * TAM callback again. The result has to match the earlier result in
+	 * ExecSeqScanEstimate.
+	 */
+	size = table_parallelscan_estimate(node->ss.ss_currentRelation,
+									   estate->es_snapshot);
+
+	ptr = (char *) pscan;
+	ptr += MAXALIGN(size);
+
+	if (node->ss.ps.instrument)
+		node->sinstrument = (SharedSeqScanInstrumentation *) ptr;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSeqScanRetrieveInstrumentation
+ *
+ *		Transfer seq scan statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanRetrieveInstrumentation(SeqScanState *node)
+{
+	SharedSeqScanInstrumentation *sinstrument = node->sinstrument;
+	Size		size;
+
+	if (sinstrument == NULL)
+		return;
+
+	size = offsetof(SharedSeqScanInstrumentation, sinstrument)
+		+ sinstrument->num_workers * sizeof(SeqScanInstrumentation);
+
+	node->sinstrument = palloc(size);
+	memcpy(node->sinstrument, sinstrument, size);
 }
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index cd54c1a74ac..627afafef05 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -72,6 +72,7 @@
 #include "postgres.h"
 
 #include "miscadmin.h"
+#include "executor/instrument_node.h"
 #include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
@@ -107,6 +108,9 @@ struct ReadStream
 	bool		advice_enabled;
 	bool		temporary;
 
+	/* stats counters */
+	ReadStreamInstrumentation stats;
+
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
 	 * control problems when I/Os are split.
@@ -172,6 +176,38 @@ block_range_read_stream_cb(ReadStream *stream,
 	return InvalidBlockNumber;
 }
 
+/*
+ * read_stream_update_stats_prefetch
+ *		update read_stream stats with current pinned buffer depth
+ *
+ * Called once per buffer returned to the consumer in read_stream_next_buffer().
+ * Records the number of pinned buffers at that moment, so we can compute the
+ * average look-ahead depth.
+ */
+static inline void
+read_stream_update_stats_prefetch(ReadStream *stream)
+{
+	stream->stats.prefetch_count++;
+	stream->stats.distance_sum += stream->pinned_buffers;
+	if (stream->pinned_buffers > stream->stats.distance_max)
+		stream->stats.distance_max = stream->pinned_buffers;
+}
+
+/*
+ * read_stream_update_stats_io
+ *		update read_stream stats about size of I/O requests
+ *
+ * We count the number of I/O requests, size of requests (counted in blocks)
+ * and number of in-progress I/Os.
+ */
+static inline void
+read_stream_update_stats_io(ReadStream *stream, int nblocks, int in_progress)
+{
+	stream->stats.io_count++;
+	stream->stats.io_nblocks += nblocks;
+	stream->stats.io_in_progress += in_progress;
+}
+
 /*
  * Ask the callback which block it would like us to read next, with a one block
  * buffer in front to allow read_stream_unget_block() to work.
@@ -380,6 +416,9 @@ read_stream_start_pending_read(ReadStream *stream)
 		Assert(stream->ios_in_progress < stream->max_ios);
 		stream->ios_in_progress++;
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
+
+		/* update I/O stats */
+		read_stream_update_stats_io(stream, nblocks, stream->ios_in_progress);
 	}
 
 	/*
@@ -703,6 +742,10 @@ read_stream_begin_impl(int flags,
 	stream->seq_until_processed = InvalidBlockNumber;
 	stream->temporary = SmgrIsTemp(smgr);
 
+	/* zero the stats, then set capacity */
+	memset(&stream->stats, 0, sizeof(ReadStreamInstrumentation));
+	stream->stats.distance_capacity = max_pinned_buffers;
+
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
 	 * reading the whole relation.  This way we start out assuming we'll be
@@ -851,6 +894,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 										flags)))
 			{
 				/* Fast return. */
+				read_stream_update_stats_prefetch(stream);
 				return buffer;
 			}
 
@@ -860,6 +904,9 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->ios_in_progress = 1;
 			stream->ios[0].buffer_index = oldest_buffer_index;
 			stream->seq_blocknum = next_blocknum + 1;
+
+			/* since we executed IO synchronously, count it as a stall */
+			stream->stats.stall_count += 1;
 		}
 		else
 		{
@@ -871,6 +918,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		}
 
 		stream->fast_path = false;
+		read_stream_update_stats_prefetch(stream);
 		return buffer;
 	}
 #endif
@@ -916,12 +964,17 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	{
 		int16		io_index = stream->oldest_io_index;
 		int32		distance;	/* wider temporary value, clamped below */
+		bool		needed_wait;
 
 		/* Sanity check that we still agree on the buffers. */
 		Assert(stream->ios[io_index].op.buffers ==
 			   &stream->buffers[oldest_buffer_index]);
 
-		WaitReadBuffers(&stream->ios[io_index].op);
+		needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
+
+		/* Count it as a stall if we need to wait for IO */
+		if (needed_wait)
+			stream->stats.stall_count += 1;
 
 		Assert(stream->ios_in_progress > 0);
 		stream->ios_in_progress--;
@@ -981,6 +1034,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	}
 #endif
 
+	read_stream_update_stats_prefetch(stream);
+
 	/* Pin transferred to caller. */
 	Assert(stream->pinned_buffers > 0);
 	stream->pinned_buffers--;
@@ -1118,3 +1173,10 @@ read_stream_end(ReadStream *stream)
 	read_stream_reset(stream);
 	pfree(stream);
 }
+
+/* return the prefetch stats for the read_stream */
+ReadStreamInstrumentation
+read_stream_prefetch_stats(ReadStream *stream)
+{
+	return stream->stats;
+}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index ad993c07311..c8f428aece8 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -347,6 +347,7 @@ extern void heap_prepare_pagescan(TableScanDesc sscan);
 extern void heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 						bool allow_strat, bool allow_sync, bool allow_pagemode);
 extern void heap_endscan(TableScanDesc sscan);
+extern TableScanStats heap_scan_stats(TableScanDesc sscan);
 extern HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction);
 extern bool heap_getnextslot(TableScanDesc sscan,
 							 ScanDirection direction, TupleTableSlot *slot);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index ce340c076f8..cada9f28bd8 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -114,6 +114,33 @@ typedef struct ParallelBlockTableScanWorkerData
 } ParallelBlockTableScanWorkerData;
 typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker;
 
+/*
+ * Generic prefetch stats for table scans.
+ */
+typedef struct TableScanStatsData
+{
+	/* number of buffers returned to consumer (for averaging distance) */
+	uint64		prefetch_count;
+
+	/* sum of pinned_buffers sampled at each buffer return */
+	uint64		distance_sum;
+
+	/* maximum actual pinned_buffers observed during the scan */
+	int16		distance_max;
+
+	/* maximum possible look-ahead distance (max_pinned_buffers) */
+	int16		distance_capacity;
+
+	/* number of stalled reads (waiting for I/O) */
+	uint64		stall_count;
+
+	/* I/O stats */
+	uint64		io_count;		/* number of I/Os */
+	uint64		io_nblocks;		/* sum of blocks for all I/Os */
+	uint64		io_in_progress;	/* sum of in-progress I/Os */
+} TableScanStatsData;
+typedef struct TableScanStatsData *TableScanStats;
+
 /*
  * Base class for fetches from a table via an index. This is the base-class
  * for such scans, which needs to be embedded in the respective struct for
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 06084752245..ab84b76443c 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -380,6 +380,11 @@ typedef struct TableAmRoutine
 											  ScanDirection direction,
 											  TupleTableSlot *slot);
 
+	/*
+	 * Collect statistics about table scan.
+	 */
+	TableScanStats		(*scan_stats) (TableScanDesc scan);
+
 	/* ------------------------------------------------------------------------
 	 * Parallel table scan related functions.
 	 * ------------------------------------------------------------------------
@@ -1006,6 +1011,18 @@ table_endscan(TableScanDesc scan)
 	scan->rs_rd->rd_tableam->scan_end(scan);
 }
 
+/*
+ * Fetch statistics about table scan.
+ */
+static inline TableScanStats
+table_scan_stats(TableScanDesc scan)
+{
+	if (scan->rs_rd->rd_tableam->scan_stats)
+		return scan->rs_rd->rd_tableam->scan_stats(scan);
+
+	return NULL;
+}
+
 /*
  * Restart a relation scan.
  */
diff --git a/src/include/commands/explain_state.h b/src/include/commands/explain_state.h
index 0b695f7d812..801e422437b 100644
--- a/src/include/commands/explain_state.h
+++ b/src/include/commands/explain_state.h
@@ -54,6 +54,7 @@ typedef struct ExplainState
 	bool		summary;		/* print total planning and execution timing */
 	bool		memory;			/* print planner's memory usage information */
 	bool		settings;		/* print modified settings */
+	bool		io;				/* print info about IO (prefetch, ...) */
 	bool		generic;		/* generate a generic plan */
 	ExplainSerializeOption serialize;	/* serialize the query's output? */
 	ExplainFormat format;		/* output format */
diff --git a/src/include/executor/instrument_node.h b/src/include/executor/instrument_node.h
index 8847d7f94fa..d94dbbc917d 100644
--- a/src/include/executor/instrument_node.h
+++ b/src/include/executor/instrument_node.h
@@ -41,9 +41,51 @@ typedef struct SharedAggInfo
 
 
 /* ---------------------
- *	Instrumentation information for indexscans (amgettuple and amgetbitmap)
+ *	Instrumentation information about read streams
  * ---------------------
  */
+typedef struct ReadStreamInstrumentation
+{
+	/* number of buffers returned to consumer (for averaging distance) */
+	uint64		prefetch_count;
+
+	/* sum of pinned_buffers sampled at each buffer return */
+	uint64		distance_sum;
+
+	/* maximum actual pinned_buffers observed during the scan */
+	int16		distance_max;
+
+	/* maximum possible look-ahead distance (max_pinned_buffers) */
+	int16		distance_capacity;
+
+	/* number of stalled reads (waiting for I/O) */
+	uint64		stall_count;
+
+	/* I/O stats */
+	uint64		io_count;		/* number of I/Os */
+	uint64		io_nblocks;		/* sum of blocks for all I/Os */
+	uint64		io_in_progress;	/* sum of in-progress I/Os */
+} ReadStreamInstrumentation;
+
+
+/* ---------------------
+ *	Instrumentation information for sequential scans
+ * ---------------------
+ */
+typedef struct SeqScanInstrumentation
+{
+	ReadStreamInstrumentation	stream;
+} SeqScanInstrumentation;
+
+/*
+ * Shared memory container for per-worker information
+ */
+typedef struct SharedSeqScanInstrumentation
+{
+	int			num_workers;
+	SeqScanInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedSeqScanInstrumentation;
+
 typedef struct IndexScanInstrumentation
 {
 	/* Index search count (incremented with pgstat_count_index_scan call) */
@@ -71,6 +113,7 @@ typedef struct BitmapHeapScanInstrumentation
 {
 	uint64		exact_pages;
 	uint64		lossy_pages;
+	ReadStreamInstrumentation	stream;
 } BitmapHeapScanInstrumentation;
 
 /*
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index 7a1490596fb..e2122bfffe3 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -27,5 +27,6 @@ extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
 extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
 extern void ExecSeqScanInitializeWorker(SeqScanState *node,
 										ParallelWorkerContext *pwcxt);
+extern void ExecSeqScanRetrieveInstrumentation(SeqScanState *node);
 
 #endif							/* NODESEQSCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 63c067d5aae..c81d18d8aec 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1635,6 +1635,8 @@ typedef struct SeqScanState
 {
 	ScanState	ss;				/* its first field is NodeTag */
 	Size		pscan_len;		/* size of parallel heap scan descriptor */
+	SeqScanInstrumentation	stats;
+	SharedSeqScanInstrumentation *sinstrument;
 } SeqScanState;
 
 /* ----------------
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index c9359b29b0f..ec396db5369 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -65,6 +65,7 @@
 
 struct ReadStream;
 typedef struct ReadStream ReadStream;
+typedef struct ReadStreamInstrumentation ReadStreamInstrumentation;
 
 /* for block_range_read_stream_cb */
 typedef struct BlockRangeReadStreamPrivate
@@ -104,4 +105,6 @@ extern void read_stream_resume(ReadStream *stream);
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
+extern ReadStreamInstrumentation read_stream_prefetch_stats(ReadStream *stream);
+
 #endif							/* READ_STREAM_H */
-- 
2.53.0

Reply via email to