Hi Tomas, On Sun, Mar 15, 2026 at 12:49 PM Tomas Vondra <[email protected]> wrote: > Here's a patch adding prefetch/read-ahead info about IO to EXPLAIN. This > was initially developed as part of the index prefetch patches, but it > became clear it can be useful for other nodes using ReadStream. That > includes seqscan and bitmap heap scans, and (hopefully) will include > index scan in PG19.
I think surfacing this to users makes a lot of sense - read streams are hard to understand in detail, and this will make it easier to tie back the I/O performance of a query to what happened in terms of pre-fetching and stalls. Big +1 on the concept of making this visible. > In EXPLAIN, this new info is enabled by a new option "IO", which adds > two new lines to the output: I'm 50/50 if hiding this behind a new option really makes sense - if its cheap enough to always capture, why not always show it? e.g. we could consider doing with this what we did with BUFFERS recently, which is to enable it by default. If someone finds that too visually busy, they could still do IO OFF. This also does make me wonder a bit what we should do about I/O timings. Conceptually they'd belong closer to IO now than BUFFERS.. > > -------------------- > EXPLAIN (ANALYZE, COSTS off, TIMING off, IO) > SELECT * FROM t WHERE a < 100000; > > QUERY PLAN > ------------------------------------------------- > Seq Scan on t (actual rows=999996.00 loops=1) > Filter: (a < 100000) > Rows Removed by Filter: 4 > Prefetch: avg=262.629 max=271 capacity=272 > I/O: stalls=653 size=15.983 inprogress=15.934 > Buffers: shared read=55556 > Planning: > Buffers: shared hit=50 read=23 > Planning Time: 7.358 ms > Execution Time: 358.214 ms > (10 rows) > -------------------- > > The first line "Prefetch" tracks the look-ahead distance, i.e. how many > blocks ahead the ReadStream is requesting. > > The second line "I/O" is about the I/O requests actually issued - how > many times we had to wait for the block (when we get to process it), > average size of a request (in BLCKSZ blocks), and average number of > in-progress requests. I wonder if we could somehow consolidate this into one line for the text format? (specifically, moving prefetch into "I/O" at the end?) I'm also not sure if "max" is really that useful, vs capacity? > 1) The information is collected by ReadStream, unconditionally. > > I experimented with having a flag to "request" collecting these stats > (so that it can be done only for EXPLAIN ANALYZE), but that turned out > pretty useless. The stats are super cheap, so the extra flag did not > make a meaningful difference. And passing the flag to the scan/stream > was way too invasive. So all ReadStreams track it. Makes sense. I did some testing with your patch against master, and couldn't find any meaningful difference - its logical that this would be cheap enough to always do. > 2) This is inherently a TAM implementation detail. > > ... > > 3) Adds shared instrumentation to SeqScan workers. > > The patch also had to improve a parallel SeqScan to collect per-worker > instrumentation, similarly to a BitmapHeapScan. Until now this was not > needed, but it's needed for tracking per-worker prefetch stats. I feel like something is off about the complexity of having each node type ferry back the information. e.g. when you're implementing the support for index prefetching, it'll require a bunch more changes. In my mind, there is a reason we have a related problem that we solved with the current pgBufferUsage, instead of dealing with that on a per-node basis. I really feel we should have a more generic way of dealing with this. I'm saying that not being completely unbiased, because I think this would be a great fit for the stack-based instrumentation I've been discussing with Andres and Zsolt over at [0]. Andres at least expressed some potential interest in getting that into 19, though its definitely not a trivial patch set. If we were to get stack-based instrumentation in, we could easily add a new "IOUsage" to the Instrumentation struct, avoid any modification to the TAM interface, and align it with how we treat BufferUsage and WALUsage. I've attached a prototype of how that could look like (apply the other patch set first, v8, see commit fest entry [1] - also attached a preparatory refactoring of using "Instrumentation" for parallel query reporting, which avoids having individual structs there). Performance is actually better for "EXPLAIN (ANALYZE, IO, BUFFERS OFF, TIMING OFF)" with that patch set on a quick COUNT(*) test, but that's mostly due to the overhead of ExecProcNode dispatching that I fixed in the other patch set (v8/0006). I think otherwise this would be similar in performance thanks to the stack avoiding any "accum diff" logic. Its also worth noting that this would make it trivial to output this information for utility commands that have read stream support, or show aggregate statistics in pg_stat_statements/etc. Let me know if that's at all of interest and happy to pair up on this to make it work. Thanks, Lukas [0]: https://www.postgresql.org/message-id/flat/CAP53PkzdBK8VJ1fS4AZ481LgMN8f9mJiC39ZRHqkFUSYq6KWmg%40mail.gmail.com [1]: https://commitfest.postgresql.org/patch/6023/ -- Lukas Fittl
From 418d18fd09930bcd89dccba7e9fe5ce3322184a4 Mon Sep 17 00:00:00 2001 From: Lukas Fittl <[email protected]> Date: Sun, 15 Mar 2026 21:44:58 -0700 Subject: [PATCH vnocfbot 2/3] Use Instrumentation struct for parallel worker communication This simplifies the allocations a bit, since we don't need to separately allocate WAL and buffer usage, and allows the easier addition of a planned third struct in Instrumentation. Author: Lukas Fittl <[email protected]> Reviewed-by: Discussion: https://postgr.es/m/ --- src/backend/access/brin/brin.c | 43 ++++++----------- src/backend/access/gin/gininsert.c | 43 ++++++----------- src/backend/access/nbtree/nbtsort.c | 43 ++++++----------- src/backend/commands/vacuumparallel.c | 52 ++++++++------------- src/backend/executor/execParallel.c | 66 ++++++++++++--------------- src/backend/executor/instrument.c | 14 +++--- src/include/executor/execParallel.h | 5 +- src/include/executor/instrument.h | 4 +- 8 files changed, 99 insertions(+), 171 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index d62eb7dee9..8b156365ee 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -49,8 +49,7 @@ #define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001) #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xB000000000000004) /* * Status for index builds performed in parallel. This is allocated in a @@ -146,8 +145,7 @@ typedef struct BrinLeader BrinShared *brinshared; Sharedsort *sharedsort; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } BrinLeader; /* @@ -2385,8 +2383,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, BrinShared *brinshared; Sharedsort *sharedsort; BrinLeader *brinleader = palloc0_object(BrinLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; int querylen; @@ -2428,18 +2425,14 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_keys(&pcxt->estimator, 2); /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. + * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. * * If there are no extensions loaded that care, we could skip this. We * have no way of knowing whether anyone's looking at instrumentation, so * do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ @@ -2512,15 +2505,12 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, } /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to + * Allocate space for each worker's Instrumentation; no need to * initialize. */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -2531,8 +2521,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, brinleader->brinshared = brinshared; brinleader->sharedsort = sharedsort; brinleader->snapshot = snapshot; - brinleader->walusage = walusage; - brinleader->bufferusage = bufferusage; + brinleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -2571,7 +2560,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) * or we might get incomplete data.) */ for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); + InstrAccumParallelQuery(&brinleader->instr[i]); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(brinleader->snapshot)) @@ -2885,8 +2874,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) LOCKMODE heapLockmode; LOCKMODE indexLockmode; QueryInstrumentation *instr; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; /* @@ -2947,11 +2935,8 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) heapRel, indexRel, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(instr, - &bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(instr, &worker_instr[ParallelWorkerNumber]); index_close(indexRel, indexLockmode); table_close(heapRel, heapLockmode); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 8cdcd2a9be..db8235944b 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -42,8 +42,7 @@ #define PARALLEL_KEY_GIN_SHARED UINT64CONST(0xB000000000000001) #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xB000000000000004) /* * Status for index builds performed in parallel. This is allocated in a @@ -135,8 +134,7 @@ typedef struct GinLeader GinBuildShared *ginshared; Sharedsort *sharedsort; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } GinLeader; typedef struct @@ -942,8 +940,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, GinBuildShared *ginshared; Sharedsort *sharedsort; GinLeader *ginleader = palloc0_object(GinLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; int querylen; @@ -984,18 +981,14 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_keys(&pcxt->estimator, 2); /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. + * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. * * If there are no extensions loaded that care, we could skip this. We * have no way of knowing whether anyone's looking at instrumentation, so * do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ @@ -1063,15 +1056,12 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, } /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to + * Allocate space for each worker's Instrumentation; no need to * initialize. */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -1082,8 +1072,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, ginleader->ginshared = ginshared; ginleader->sharedsort = sharedsort; ginleader->snapshot = snapshot; - ginleader->walusage = walusage; - ginleader->bufferusage = bufferusage; + ginleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -1122,7 +1111,7 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) * or we might get incomplete data.) */ for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); + InstrAccumParallelQuery(&ginleader->instr[i]); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(ginleader->snapshot)) @@ -2115,8 +2104,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) LOCKMODE heapLockmode; LOCKMODE indexLockmode; QueryInstrumentation *instr; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; /* @@ -2196,11 +2184,8 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) heapRel, indexRel, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(instr, - &bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(instr, &worker_instr[ParallelWorkerNumber]); index_close(indexRel, indexLockmode); table_close(heapRel, heapLockmode); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index dfe4fd9459..c56da36023 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -65,8 +65,7 @@ #define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002) #define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xA000000000000006) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xA000000000000005) /* * DISABLE_LEADER_PARTICIPATION disables the leader's participation in @@ -194,8 +193,7 @@ typedef struct BTLeader Sharedsort *sharedsort; Sharedsort *sharedsort2; Snapshot snapshot; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; } BTLeader; /* @@ -1407,8 +1405,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) Sharedsort *sharedsort2; BTSpool *btspool = buildstate->spool; BTLeader *btleader = palloc0_object(BTLeader); - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *instr; bool leaderparticipates = true; int querylen; @@ -1461,18 +1458,14 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) } /* - * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE - * and PARALLEL_KEY_BUFFER_USAGE. + * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. * * If there are no extensions loaded that care, we could skip this. We * have no way of knowing whether anyone's looking at instrumentation, so * do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ @@ -1559,15 +1552,12 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) } /* - * Allocate space for each worker's WalUsage and BufferUsage; no need to + * Allocate space for each worker's Instrumentation; no need to * initialize. */ - walusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); - bufferusage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -1579,8 +1569,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btleader->sharedsort = sharedsort; btleader->sharedsort2 = sharedsort2; btleader->snapshot = snapshot; - btleader->walusage = walusage; - btleader->bufferusage = bufferusage; + btleader->instr = instr; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -1619,7 +1608,7 @@ _bt_end_parallel(BTLeader *btleader) * or we might get incomplete data.) */ for (i = 0; i < btleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); + InstrAccumParallelQuery(&btleader->instr[i]); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(btleader->snapshot)) @@ -1753,8 +1742,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) LOCKMODE heapLockmode; LOCKMODE indexLockmode; QueryInstrumentation *instr; - WalUsage *walusage; - BufferUsage *bufferusage; + Instrumentation *worker_instr; int sortmem; #ifdef BTREE_BUILD_STATS @@ -1836,11 +1824,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) sharedsort2, sortmem, false); /* Report WAL/buffer usage during parallel execution */ - bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(instr, - &bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(instr, &worker_instr[ParallelWorkerNumber]); #ifdef BTREE_BUILD_STATS if (log_btree_build_stats) diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 75074fe4ef..753dd965d7 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -47,9 +47,8 @@ */ #define PARALLEL_VACUUM_KEY_SHARED 1 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 2 -#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 3 -#define PARALLEL_VACUUM_KEY_WAL_USAGE 4 -#define PARALLEL_VACUUM_KEY_INDEX_STATS 5 +#define PARALLEL_VACUUM_KEY_INSTRUMENTATION 3 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 4 /* * Shared information among parallel workers. So this is allocated in the DSM @@ -188,11 +187,8 @@ struct ParallelVacuumState /* Shared dead items space among parallel vacuum workers */ TidStore *dead_items; - /* Points to buffer usage area in DSM */ - BufferUsage *buffer_usage; - - /* Points to WAL usage area in DSM */ - WalUsage *wal_usage; + /* Points to instrumentation area in DSM */ + Instrumentation *instr; /* * False if the index is totally unsuitable target for all parallel @@ -250,8 +246,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PVShared *shared; TidStore *dead_items; PVIndStats *indstats; - BufferUsage *buffer_usage; - WalUsage *wal_usage; + Instrumentation *instr; bool *will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; @@ -304,18 +299,15 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_estimate_keys(&pcxt->estimator, 1); /* - * Estimate space for BufferUsage and WalUsage -- - * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. + * Estimate space for Instrumentation -- + * PARALLEL_VACUUM_KEY_INSTRUMENTATION. * * If there are no extensions loaded that care, we could skip this. We * have no way of knowing whether anyone's looking at instrumentation, so * do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ @@ -396,17 +388,13 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, pvs->shared = shared; /* - * Allocate space for each worker's BufferUsage and WalUsage; no need to - * initialize + * Allocate space for each worker's Instrumentation; no need to + * initialize. */ - buffer_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); - pvs->buffer_usage = buffer_usage; - wal_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); - pvs->wal_usage = wal_usage; + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, instr); + pvs->instr = instr; /* Store query string for workers */ if (debug_query_string) @@ -738,7 +726,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan WaitForParallelWorkersToFinish(pvs->pcxt); for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + InstrAccumParallelQuery(&pvs->instr[i]); } /* @@ -996,8 +984,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) PVShared *shared; TidStore *dead_items; QueryInstrumentation *instr; - BufferUsage *buffer_usage; - WalUsage *wal_usage; + Instrumentation *worker_instr; int nindexes; char *sharedquery; ErrorContextCallback errcallback; @@ -1091,11 +1078,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) parallel_vacuum_process_safe_indexes(&pvs); /* Report buffer/WAL usage during parallel execution */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(instr, - &buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + worker_instr = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(instr, &worker_instr[ParallelWorkerNumber]); /* Report any remaining cost-based vacuum delay time */ if (track_cost_delay_timing) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 73534fa6c7..ebab6bc165 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -60,13 +60,12 @@ #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000004) #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_NODE_INSTRUMENTATION UINT64CONST(0xE000000000000006) #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) -#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -625,8 +624,6 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_data; char *pstmt_space; char *paramlistinfo_space; - BufferUsage *bufusage_space; - WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -690,21 +687,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_keys(&pcxt->estimator, 1); /* - * Estimate space for BufferUsage. + * Estimate space for Instrumentation. * * If EXPLAIN is not in use and there are no extensions loaded that care, * we could skip this. But we have no way of knowing whether anyone's * looking at instrumentation, so do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* - * Same thing for WalUsage. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); + mul_size(sizeof(Instrumentation), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate space for tuple queues. */ @@ -790,17 +780,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); - /* Allocate space for each worker's BufferUsage; no need to initialize. */ - bufusage_space = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); - pei->buffer_usage = bufusage_space; + /* + * Allocate space for each worker's Instrumentation; no need to + * initialize. + */ + { + Instrumentation *instr; - /* Same for WalUsage. */ - walusage_space = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); - pei->wal_usage = walusage_space; + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); + pei->instrumentation = instr; + } /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); @@ -826,9 +817,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, instrument = GetInstrumentationArray(instrumentation); for (i = 0; i < nworkers * e.nnodes; ++i) InstrInitNode(&instrument[i], estate->es_instrument); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, + shm_toc_insert(pcxt->toc, PARALLEL_KEY_NODE_INSTRUMENTATION, instrumentation); - pei->instrumentation = instrumentation; + pei->node_instrumentation = instrumentation; if (estate->es_jit_flags != PGJIT_NONE) { @@ -1230,7 +1221,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei) * finish, or we might get incomplete data.) */ for (i = 0; i < nworkers; i++) - InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + InstrAccumParallelQuery(&pei->instrumentation[i]); pei->finished = true; } @@ -1244,10 +1235,10 @@ ExecParallelFinish(ParallelExecutorInfo *pei) void ExecParallelCleanup(ParallelExecutorInfo *pei) { - /* Accumulate instrumentation, if any. */ - if (pei->instrumentation) + /* Accumulate node instrumentation, if any. */ + if (pei->node_instrumentation) ExecParallelRetrieveInstrumentation(pei->planstate, - pei->instrumentation); + pei->node_instrumentation); /* Accumulate JIT instrumentation, if any. */ if (pei->jit_instrumentation) @@ -1471,8 +1462,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { FixedParallelExecutorState *fpes; QueryInstrumentation *instr; - BufferUsage *buffer_usage; - WalUsage *wal_usage; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1487,7 +1476,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); - instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); + instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_NODE_INSTRUMENTATION, true); if (instrumentation != NULL) instrument_options = instrumentation->instrument_options; jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION, @@ -1545,11 +1534,12 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorFinish(queryDesc); /* Report buffer/WAL usage during parallel execution. */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(instr, - &buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + { + Instrumentation *worker_instr; + + worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); + InstrEndParallelQuery(instr, &worker_instr[ParallelWorkerNumber]); + } /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 2727e7b5ce..61297c5653 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -286,11 +286,11 @@ InstrStartParallelQuery(void) /* report usage after parallel executor shutdown */ void -InstrEndParallelQuery(QueryInstrumentation *qinstr, BufferUsage *bufusage, WalUsage *walusage) +InstrEndParallelQuery(QueryInstrumentation *qinstr, Instrumentation *dst) { qinstr = InstrQueryStopFinalize(qinstr); - memcpy(bufusage, &qinstr->instr.bufusage, sizeof(BufferUsage)); - memcpy(walusage, &qinstr->instr.walusage, sizeof(WalUsage)); + memcpy(&dst->bufusage, &qinstr->instr.bufusage, sizeof(BufferUsage)); + memcpy(&dst->walusage, &qinstr->instr.walusage, sizeof(WalUsage)); } /* @@ -306,12 +306,12 @@ InstrEndParallelQuery(QueryInstrumentation *qinstr, BufferUsage *bufusage, WalUs * activity is accumulated. */ void -InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrAccumParallelQuery(Instrumentation *instr) { - BufferUsageAdd(&instr_stack.current->bufusage, bufusage); - WalUsageAdd(&instr_stack.current->walusage, walusage); + BufferUsageAdd(&instr_stack.current->bufusage, &instr->bufusage); + WalUsageAdd(&instr_stack.current->walusage, &instr->walusage); - WalUsageAdd(&pgWalUsage, walusage); + WalUsageAdd(&pgWalUsage, &instr->walusage); } /* Node instrumentation handling */ diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 5a2034811d..6c8b602d07 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -25,9 +25,8 @@ typedef struct ParallelExecutorInfo { PlanState *planstate; /* plan subtree we're running in parallel */ ParallelContext *pcxt; /* parallel context we're using */ - BufferUsage *buffer_usage; /* points to bufusage area in DSM */ - WalUsage *wal_usage; /* walusage area in DSM */ - SharedExecutorInstrumentation *instrumentation; /* optional */ + Instrumentation *instrumentation; /* instrumentation area in DSM */ + SharedExecutorInstrumentation *node_instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index bce58acb11..e4fc9e7870 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -275,8 +275,8 @@ extern QueryInstrumentation *InstrQueryStopFinalize(QueryInstrumentation *instr) extern void InstrQueryRememberNode(QueryInstrumentation *parent, NodeInstrumentation *instr); pg_nodiscard extern QueryInstrumentation *InstrStartParallelQuery(void); -extern void InstrEndParallelQuery(QueryInstrumentation *qinstr, BufferUsage *bufusage, WalUsage *walusage); -extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +extern void InstrEndParallelQuery(QueryInstrumentation *qinstr, Instrumentation *dst); +extern void InstrAccumParallelQuery(Instrumentation *instr); extern NodeInstrumentation *InstrAllocNode(int instrument_options, bool async_mode); -- 2.43.0
From 6c03de986842a0707890dff3668b9d8273f44b0f Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Tue, 3 Mar 2026 16:50:50 -0500 Subject: [PATCH vnocfbot 1/3] bufmgr: Return whether WaitReadBuffers() needed to wait In a subsequent commit read_stream.c will use this as an input to the read ahead distance. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/storage/buffer/bufmgr.c | 18 +++++++++++++++++- src/include/storage/bufmgr.h | 2 +- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 41a0baa344..15a11108b8 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1738,12 +1738,20 @@ ProcessReadBuffersResult(ReadBuffersOperation *operation) Assert(operation->nblocks_done <= operation->nblocks); } -void +/* + * Wait for the IO operation initiated by StartReadBuffers() et al to + * complete. + * + * Returns whether the IO operation already had completed by the time of this + * call. + */ +bool WaitReadBuffers(ReadBuffersOperation *operation) { PgAioReturn *aio_ret = &operation->io_return; IOContext io_context; IOObject io_object; + bool needed_wait = false; if (operation->persistence == RELPERSISTENCE_TEMP) { @@ -1805,6 +1813,7 @@ WaitReadBuffers(ReadBuffersOperation *operation) instr_time io_start = pgstat_prepare_io_time(track_io_timing); pgaio_wref_wait(&operation->io_wref); + needed_wait = true; /* * The IO operation itself was already counted earlier, in @@ -1835,6 +1844,12 @@ WaitReadBuffers(ReadBuffersOperation *operation) CHECK_FOR_INTERRUPTS(); + /* + * If the IO completed only partially, we need to perform additional + * work, consider that a form of having had to wait. + */ + needed_wait = true; + /* * This may only complete the IO partially, either because some * buffers were already valid, or because of a partial read. @@ -1851,6 +1866,7 @@ WaitReadBuffers(ReadBuffersOperation *operation) CheckReadBuffersOperation(operation, true); /* NB: READ_DONE tracepoint was already executed in completion callback */ + return needed_wait; } /* diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 4017896f95..a1ef04354d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -249,7 +249,7 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation, BlockNumber blockNum, int *nblocks, int flags); -extern void WaitReadBuffers(ReadBuffersOperation *operation); +extern bool WaitReadBuffers(ReadBuffersOperation *operation); extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); -- 2.43.0
From 08821a5db5b7e78dd31f1911253d907a391c6b69 Mon Sep 17 00:00:00 2001 From: Lukas Fittl <[email protected]> Date: Sun, 15 Mar 2026 23:29:15 -0700 Subject: [PATCH vnocfbot 3/3] instrumentation: Track I/O prefetch info and show with EXPLAIN (IO) This adds details about AIO / prefetch for executor nodes using the ReadStream API, currently SeqScan and BitmapHeapScan, into the new IOUsage information added to Instrumentation. This can be viewed through the new EXPLAIN (IO) information, or could be tracked by other interested callers through the stack-based instrumentation mechanism. The ReadStream tracks the statistics unconditionally, i.e. even outside EXPLAIN ANALYZE etc. The amount of statistics is trivial (a handful of integer counters), it's not worth gating this by a flag. Author: Lukas Fittl <[email protected]> Author: Tomas Vondra <[email protected]> Reviewed By: Discussion: https://www.postgresql.org/message-id/flat/a177a6dd-240b-455a-8f25-aca0b1c08c6e%40vondra.me --- src/backend/commands/explain.c | 75 +++++++++++++++++++++++++-- src/backend/commands/explain_state.c | 8 +++ src/backend/executor/execMain.c | 6 +-- src/backend/executor/instrument.c | 54 ++++++++++++++----- src/backend/storage/aio/read_stream.c | 33 +++++++++++- src/include/commands/explain_state.h | 1 + src/include/executor/instrument.h | 31 +++++++++++ src/tools/pgindent/typedefs.list | 1 + 8 files changed, 189 insertions(+), 20 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 8a641f9d05..6eca0e2051 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -145,6 +145,7 @@ static const char *explain_get_index_name(Oid indexId); static bool peek_buffer_usage(ExplainState *es, const BufferUsage *usage); static void show_buffer_usage(ExplainState *es, const BufferUsage *usage, const char *title); static void show_wal_usage(ExplainState *es, const WalUsage *usage); +static void show_io_usage(ExplainState *es, const IOUsage *usage); static void show_memory_counters(ExplainState *es, const MemoryContextCounters *mem_counters); static void show_result_replacement_info(Result *result, ExplainState *es); @@ -509,6 +510,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, instrument_option |= INSTRUMENT_BUFFERS; if (es->wal) instrument_option |= INSTRUMENT_WAL; + if (es->io) + instrument_option |= INSTRUMENT_IO; /* * We always collect timing for the entire statement, even when node-level @@ -2281,14 +2284,16 @@ ExplainNode(PlanState *planstate, List *ancestors, } } - /* Show buffer/WAL usage */ + /* Show buffer/WAL/IO usage */ if (es->buffers && planstate->instrument) show_buffer_usage(es, &planstate->instrument->instr.bufusage, NULL); if (es->wal && planstate->instrument) show_wal_usage(es, &planstate->instrument->instr.walusage); + if (es->io && planstate->instrument) + show_io_usage(es, &planstate->instrument->instr.iousage); - /* Prepare per-worker buffer/WAL usage */ - if (es->workers_state && (es->buffers || es->wal) && es->verbose) + /* Prepare per-worker buffer/WAL/IO usage */ + if (es->workers_state && (es->buffers || es->wal || es->io) && es->verbose) { WorkerNodeInstrumentation *w = planstate->worker_instrument; @@ -2305,6 +2310,8 @@ ExplainNode(PlanState *planstate, List *ancestors, show_buffer_usage(es, &instrument->instr.bufusage, NULL); if (es->wal) show_wal_usage(es, &instrument->instr.walusage); + if (es->io) + show_io_usage(es, &instrument->instr.iousage); ExplainCloseWorker(n, es); } } @@ -4343,6 +4350,68 @@ show_wal_usage(ExplainState *es, const WalUsage *usage) } } +/* + * Show I/O prefetch usage details. + */ +static void +show_io_usage(ExplainState *es, const IOUsage *usage) +{ + /* Nothing to show if no buffers were returned */ + if (usage->count <= 0) + return; + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + /* prefetch distance info */ + ExplainIndentText(es); + appendStringInfo(es->str, "Prefetch: avg=%.3f max=%" PRId64 " capacity=%" PRId64, + (usage->distance_sum * 1.0 / usage->count), + usage->distance_max, + usage->distance_capacity); + appendStringInfoChar(es->str, '\n'); + + /* prefetch I/O info (only if there were actual I/Os) */ + if (usage->stall_count > 0 || usage->io_count > 0) + { + ExplainIndentText(es); + appendStringInfo(es->str, "I/O: stalls=%" PRId64, + usage->stall_count); + + if (usage->io_count > 0) + { + appendStringInfo(es->str, " size=%.3f inprogress=%.3f", + (usage->io_blocks * 1.0 / usage->io_count), + (usage->ios_in_progress * 1.0 / usage->io_count)); + } + + appendStringInfoChar(es->str, '\n'); + } + } + else + { + ExplainOpenGroup("Prefetch", "I/O", true, es); + + ExplainPropertyFloat("Average Distance", NULL, + (usage->distance_sum * 1.0 / usage->count), 3, es); + ExplainPropertyInteger("Max Distance", NULL, + usage->distance_max, es); + ExplainPropertyInteger("Capacity", NULL, + usage->distance_capacity, es); + ExplainPropertyInteger("Stalls", NULL, + usage->stall_count, es); + + if (usage->io_count > 0) + { + ExplainPropertyFloat("Average IO Size", NULL, + (usage->io_blocks * 1.0 / usage->io_count), 3, es); + ExplainPropertyFloat("Average IOs In Progress", NULL, + (usage->ios_in_progress * 1.0 / usage->io_count), 3, es); + } + + ExplainCloseGroup("Prefetch", "I/O", true, es); + } +} + /* * Show memory usage details. */ diff --git a/src/backend/commands/explain_state.c b/src/backend/commands/explain_state.c index 77f59b8e50..b5129f4914 100644 --- a/src/backend/commands/explain_state.c +++ b/src/backend/commands/explain_state.c @@ -115,6 +115,8 @@ ParseExplainOptionList(ExplainState *es, List *options, ParseState *pstate) } else if (strcmp(opt->defname, "memory") == 0) es->memory = defGetBoolean(opt); + else if (strcmp(opt->defname, "io") == 0) + es->io = defGetBoolean(opt); else if (strcmp(opt->defname, "serialize") == 0) { if (opt->arg) @@ -185,6 +187,12 @@ ParseExplainOptionList(ExplainState *es, List *options, ParseState *pstate) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("EXPLAIN option %s requires ANALYZE", "TIMING"))); + /* check that IO is used with EXPLAIN ANALYZE */ + if (es->io && !es->analyze) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("EXPLAIN option %s requires ANALYZE", "IO"))); + /* check that serialize is used with EXPLAIN ANALYZE */ if (es->serialize != EXPLAIN_SERIALIZE_NONE && !es->analyze) ereport(ERROR, diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 82253317e9..de0795c944 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -334,7 +334,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, * Start up required top-level instrumentation stack for WAL/buffer * tracking */ - if (!queryDesc->totaltime && (estate->es_instrument & (INSTRUMENT_BUFFERS | INSTRUMENT_WAL))) + if (!queryDesc->totaltime && (estate->es_instrument & (INSTRUMENT_BUFFERS | INSTRUMENT_WAL | INSTRUMENT_IO))) queryDesc->totaltime = InstrQueryAlloc(estate->es_instrument); if (queryDesc->totaltime) @@ -347,7 +347,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, * after the first call to InstrQueryStart has pushed the parent * entry. */ - if ((estate->es_instrument & (INSTRUMENT_BUFFERS | INSTRUMENT_WAL)) && + if ((estate->es_instrument & (INSTRUMENT_BUFFERS | INSTRUMENT_WAL | INSTRUMENT_IO)) && !queryDesc->already_executed) ExecRememberNodeInstrumentation(queryDesc->planstate, queryDesc->totaltime); @@ -1535,7 +1535,7 @@ ExecFinalizeTriggerInstrumentation(EState *estate) { TriggerInstrumentation *ti = rInfo->ri_TrigInstrument; - if (ti && (ti->instr.need_bufusage || ti->instr.need_walusage)) + if (ti && (ti->instr.need_bufusage || ti->instr.need_walusage || ti->instr.need_iousage)) InstrAccum(instr_stack.current, &ti->instr); } } diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 61297c5653..c8f182e7c5 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -46,6 +46,7 @@ InstrInitOptions(Instrumentation *instr, int instrument_options) { instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0; instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0; + instr->need_iousage = (instrument_options & INSTRUMENT_IO) != 0; instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; } @@ -76,7 +77,7 @@ InstrStart(Instrumentation *instr) if (instr->need_timer) InstrStartTimer(instr); - if (instr->need_bufusage || instr->need_walusage) + if (instr->need_bufusage || instr->need_walusage || instr->need_iousage) InstrPushStack(instr); } @@ -86,7 +87,7 @@ InstrStop(Instrumentation *instr) if (instr->need_timer) InstrStopTimer(instr); - if (instr->need_bufusage || instr->need_walusage) + if (instr->need_bufusage || instr->need_walusage || instr->need_iousage) InstrPopStack(instr); } @@ -197,7 +198,9 @@ InstrQueryAlloc(int instrument_options) * survives transaction abort — ResourceOwner release needs to access * it. */ - if ((instrument_options & INSTRUMENT_BUFFERS) != 0 || (instrument_options & INSTRUMENT_WAL) != 0) + if ((instrument_options & INSTRUMENT_BUFFERS) != 0 || + (instrument_options & INSTRUMENT_WAL) != 0 || + (instrument_options & INSTRUMENT_IO) != 0) instr = MemoryContextAllocZero(TopMemoryContext, sizeof(QueryInstrumentation)); else instr = palloc0(sizeof(QueryInstrumentation)); @@ -213,7 +216,7 @@ InstrQueryStart(QueryInstrumentation *qinstr) { InstrStart(&qinstr->instr); - if (qinstr->instr.need_bufusage || qinstr->instr.need_walusage) + if (qinstr->instr.need_bufusage || qinstr->instr.need_walusage || qinstr->instr.need_iousage) { Assert(CurrentResourceOwner != NULL); qinstr->owner = CurrentResourceOwner; @@ -228,7 +231,7 @@ InstrQueryStop(QueryInstrumentation *qinstr) { InstrStop(&qinstr->instr); - if (qinstr->instr.need_bufusage || qinstr->instr.need_walusage) + if (qinstr->instr.need_bufusage || qinstr->instr.need_walusage || qinstr->instr.need_iousage) { Assert(qinstr->owner != NULL); ResourceOwnerForgetInstrumentation(qinstr->owner, qinstr); @@ -243,7 +246,7 @@ InstrQueryStopFinalize(QueryInstrumentation *qinstr) InstrStopFinalize(&qinstr->instr); - if (!qinstr->instr.need_bufusage && !qinstr->instr.need_walusage) + if (!qinstr->instr.need_bufusage && !qinstr->instr.need_walusage && !qinstr->instr.need_iousage) return qinstr; Assert(qinstr->owner != NULL); @@ -270,7 +273,7 @@ InstrQueryStopFinalize(QueryInstrumentation *qinstr) void InstrQueryRememberNode(QueryInstrumentation *parent, NodeInstrumentation *child) { - if (child->instr.need_bufusage || child->instr.need_walusage) + if (child->instr.need_bufusage || child->instr.need_walusage || child->instr.need_iousage) dlist_push_head(&parent->unfinalized_children, &child->unfinalized_node); } @@ -278,7 +281,7 @@ InstrQueryRememberNode(QueryInstrumentation *parent, NodeInstrumentation *child) QueryInstrumentation * InstrStartParallelQuery(void) { - QueryInstrumentation *qinstr = InstrQueryAlloc(INSTRUMENT_BUFFERS | INSTRUMENT_WAL); + QueryInstrumentation *qinstr = InstrQueryAlloc(INSTRUMENT_BUFFERS | INSTRUMENT_WAL | INSTRUMENT_IO); InstrQueryStart(qinstr); return qinstr; @@ -291,6 +294,7 @@ InstrEndParallelQuery(QueryInstrumentation *qinstr, Instrumentation *dst) qinstr = InstrQueryStopFinalize(qinstr); memcpy(&dst->bufusage, &qinstr->instr.bufusage, sizeof(BufferUsage)); memcpy(&dst->walusage, &qinstr->instr.walusage, sizeof(WalUsage)); + memcpy(&dst->iousage, &qinstr->instr.iousage, sizeof(IOUsage)); } /* @@ -310,6 +314,7 @@ InstrAccumParallelQuery(Instrumentation *instr) { BufferUsageAdd(&instr_stack.current->bufusage, &instr->bufusage); WalUsageAdd(&instr_stack.current->walusage, &instr->walusage); + IOUsageAdd(&instr_stack.current->iousage, &instr->iousage); WalUsageAdd(&pgWalUsage, &instr->walusage); } @@ -329,7 +334,9 @@ InstrAllocNode(int instrument_options, bool async_mode) * utility commands that restart transactions, which would require a * context that survives longer (EXPLAIN ANALYZE is fine). */ - if ((instrument_options & INSTRUMENT_BUFFERS) != 0 || (instrument_options & INSTRUMENT_WAL) != 0) + if ((instrument_options & INSTRUMENT_BUFFERS) != 0 || + (instrument_options & INSTRUMENT_WAL) != 0 || + (instrument_options & INSTRUMENT_IO) != 0) instr = MemoryContextAlloc(TopTransactionContext, sizeof(NodeInstrumentation)); else instr = palloc(sizeof(NodeInstrumentation)); @@ -392,7 +399,7 @@ InstrStopNode(NodeInstrumentation *instr, double nTuples) InstrStopNodeTimer(instr); /* Only pop the stack, accumulation runs in InstrFinalizeNode */ - if (instr->instr.need_bufusage || instr->instr.need_walusage) + if (instr->instr.need_bufusage || instr->instr.need_walusage || instr->instr.need_iousage) InstrPopStack(&instr->instr); instr->running = true; @@ -407,7 +414,7 @@ InstrFinalizeNode(NodeInstrumentation *instr, Instrumentation *parent) NodeInstrumentation *dst; /* If we didn't use stack based instrumentation, nothing to be done */ - if (!instr->instr.need_bufusage && !instr->instr.need_walusage) + if (!instr->instr.need_bufusage && !instr->instr.need_walusage && !instr->instr.need_iousage) return instr; /* Copy into per-query memory context */ @@ -418,7 +425,7 @@ InstrFinalizeNode(NodeInstrumentation *instr, Instrumentation *parent) InstrAccum(parent, &dst->instr); /* Unregister from query's unfinalized list before freeing */ - if (instr->instr.need_bufusage || instr->instr.need_walusage) + if (instr->instr.need_bufusage || instr->instr.need_walusage || instr->instr.need_iousage) dlist_delete(&instr->unfinalized_node); pfree(instr); @@ -489,6 +496,9 @@ InstrAggNode(NodeInstrumentation *dst, NodeInstrumentation *add) if (dst->instr.need_walusage) WalUsageAdd(&dst->instr.walusage, &add->instr.walusage); + + if (dst->instr.need_iousage) + IOUsageAdd(&dst->instr.iousage, &add->instr.iousage); } /* @@ -598,7 +608,8 @@ InstrNodeSetupExecProcNode(NodeInstrumentation *instr) { bool need_timer = instr->instr.need_timer; bool need_buf = (instr->instr.need_bufusage || - instr->instr.need_walusage); + instr->instr.need_walusage || + instr->instr.need_iousage); if (need_timer && need_buf) return ExecProcNodeInstrFull; @@ -649,6 +660,7 @@ InstrAccum(Instrumentation *dst, Instrumentation *add) BufferUsageAdd(&dst->bufusage, &add->bufusage); WalUsageAdd(&dst->walusage, &add->walusage); + IOUsageAdd(&dst->iousage, &add->iousage); } /* dst += add */ @@ -684,6 +696,22 @@ WalUsageAdd(WalUsage *dst, const WalUsage *add) dst->wal_buffers_full += add->wal_buffers_full; } +/* dst += add (using max semantics for distance_max and distance_capacity) */ +void +IOUsageAdd(IOUsage *dst, const IOUsage *add) +{ + dst->count += add->count; + dst->distance_sum += add->distance_sum; + if (add->distance_max > dst->distance_max) + dst->distance_max = add->distance_max; + if (add->distance_capacity > dst->distance_capacity) + dst->distance_capacity = add->distance_capacity; + dst->stall_count += add->stall_count; + dst->io_count += add->io_count; + dst->io_blocks += add->io_blocks; + dst->ios_in_progress += add->ios_in_progress; +} + void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub) { diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index cd54c1a74a..6d2285beb1 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -71,6 +71,7 @@ */ #include "postgres.h" +#include "executor/instrument.h" #include "miscadmin.h" #include "storage/aio.h" #include "storage/fd.h" @@ -172,6 +173,19 @@ block_range_read_stream_cb(ReadStream *stream, return InvalidBlockNumber; } +/* + * Update IO instrumentation when returning a buffer to the consumer. + * Records the current look-ahead depth for averaging. + */ +static inline void +read_stream_instr_update(ReadStream *stream) +{ + INSTR_IOUSAGE_INCR(count); + INSTR_IOUSAGE_ADD(distance_sum, stream->pinned_buffers); + INSTR_IOUSAGE_MAX(distance_max, stream->pinned_buffers); + INSTR_IOUSAGE_MAX(distance_capacity, stream->max_pinned_buffers); +} + /* * Ask the callback which block it would like us to read next, with a one block * buffer in front to allow read_stream_unget_block() to work. @@ -380,6 +394,11 @@ read_stream_start_pending_read(ReadStream *stream) Assert(stream->ios_in_progress < stream->max_ios); stream->ios_in_progress++; stream->seq_blocknum = stream->pending_read_blocknum + nblocks; + + /* Update I/O stats */ + INSTR_IOUSAGE_INCR(io_count); + INSTR_IOUSAGE_ADD(io_blocks, nblocks); + INSTR_IOUSAGE_ADD(ios_in_progress, stream->ios_in_progress); } /* @@ -851,6 +870,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) flags))) { /* Fast return. */ + read_stream_instr_update(stream); return buffer; } @@ -860,6 +880,9 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios_in_progress = 1; stream->ios[0].buffer_index = oldest_buffer_index; stream->seq_blocknum = next_blocknum + 1; + + /* Since we executed IO synchronously, count it as a stall */ + INSTR_IOUSAGE_INCR(stall_count); } else { @@ -871,6 +894,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) } stream->fast_path = false; + read_stream_instr_update(stream); return buffer; } #endif @@ -916,12 +940,17 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) { int16 io_index = stream->oldest_io_index; int32 distance; /* wider temporary value, clamped below */ + bool needed_wait; /* Sanity check that we still agree on the buffers. */ Assert(stream->ios[io_index].op.buffers == &stream->buffers[oldest_buffer_index]); - WaitReadBuffers(&stream->ios[io_index].op); + needed_wait = WaitReadBuffers(&stream->ios[io_index].op); + + /* Count it as a stall if we needed to wait for I/O */ + if (needed_wait) + INSTR_IOUSAGE_INCR(stall_count); Assert(stream->ios_in_progress > 0); stream->ios_in_progress--; @@ -981,6 +1010,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) } #endif + read_stream_instr_update(stream); + /* Pin transferred to caller. */ Assert(stream->pinned_buffers > 0); stream->pinned_buffers--; diff --git a/src/include/commands/explain_state.h b/src/include/commands/explain_state.h index 0b695f7d81..801e422437 100644 --- a/src/include/commands/explain_state.h +++ b/src/include/commands/explain_state.h @@ -54,6 +54,7 @@ typedef struct ExplainState bool summary; /* print total planning and execution timing */ bool memory; /* print planner's memory usage information */ bool settings; /* print modified settings */ + bool io; /* print info about IO (prefetch, ...) */ bool generic; /* generate a generic plan */ ExplainSerializeOption serialize; /* serialize the query's output? */ ExplainFormat format; /* output format */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index e4fc9e7870..0b7a84fc55 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -58,6 +58,22 @@ typedef struct WalUsage int64 wal_buffers_full; /* # of times the WAL buffers became full */ } WalUsage; +/* + * IOUsage tracks I/O prefetch activity that can be measured per executor + * node and displayed by EXPLAIN (ANALYZE, IO). + */ +typedef struct IOUsage +{ + int64 count; /* # of buffers returned */ + int64 distance_sum; /* sum of look-ahead distances */ + int64 distance_max; /* max look-ahead distance observed */ + int64 distance_capacity; /* max possible look-ahead */ + int64 stall_count; /* # of I/O stalls */ + int64 io_count; /* # of I/O operations */ + int64 io_blocks; /* total blocks across I/Os */ + int64 ios_in_progress; /* sum of in-progress I/Os */ +} IOUsage; + /* Flag bits included in InstrAlloc's instrument_options bitmask */ typedef enum InstrumentOption { @@ -65,6 +81,7 @@ typedef enum InstrumentOption INSTRUMENT_BUFFERS = 1 << 1, /* needs buffer usage */ INSTRUMENT_ROWS = 1 << 2, /* needs row count */ INSTRUMENT_WAL = 1 << 3, /* needs WAL usage */ + INSTRUMENT_IO = 1 << 4, /* needs I/O prefetch usage */ INSTRUMENT_ALL = PG_INT32_MAX } InstrumentOption; @@ -92,12 +109,14 @@ typedef struct Instrumentation bool need_timer; /* true if we need timer data */ bool need_bufusage; /* true if we need buffer usage data */ bool need_walusage; /* true if we need WAL usage data */ + bool need_iousage; /* true if we need I/O prefetch data */ /* Internal state keeping: */ instr_time starttime; /* start time of last InstrStart */ /* Accumulated statistics: */ instr_time total; /* total runtime */ BufferUsage bufusage; /* total buffer usage */ WalUsage walusage; /* total WAL usage */ + IOUsage iousage; /* total I/O prefetch usage */ } Instrumentation; /* @@ -301,6 +320,7 @@ extern void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); extern void WalUsageAdd(WalUsage *dst, const WalUsage *add); extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub); +extern void IOUsageAdd(IOUsage *dst, const IOUsage *add); #define INSTR_BUFUSAGE_INCR(fld) do { \ instr_stack.current->bufusage.fld++; \ @@ -324,4 +344,15 @@ extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, instr_stack.current->walusage.fld += val; \ } while(0) +#define INSTR_IOUSAGE_INCR(fld) do { \ + instr_stack.current->iousage.fld++; \ + } while(0) +#define INSTR_IOUSAGE_ADD(fld,val) do { \ + instr_stack.current->iousage.fld += val; \ + } while(0) +#define INSTR_IOUSAGE_MAX(fld,val) do { \ + if ((val) > instr_stack.current->iousage.fld) \ + instr_stack.current->iousage.fld = (val); \ + } while(0) + #endif /* INSTRUMENT_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1c9be944c5..985662a252 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1336,6 +1336,7 @@ InvalMessageArray InvalidationInfo InvalidationMsgsGroup IoMethodOps +IOUsage IpcMemoryId IpcMemoryKey IpcMemoryState -- 2.43.0
