Hi, Here is a significantly revised version of the patch series. A lot has changed since the January submission, so I want to summarize the design changes before getting into the patches. I think it does address the points in the two reviews that landed since v5 but maybe a bunch of points became moot after my rewrite of the relevant portions (thanks Junwang and ChangAo for the review in any case).
At this point it might be better to think of this as targeting v20, except that if there is review bandwidth in the remaining two weeks before the v19 feature freeze, the rs_vistuples[] change described below as a standalone improvement to the existing pagemode scan path could be considered for v19, though that too is an optimistic scenario. It is also worth noting that Andres identified a number of inefficiencies in the existing scan path in: Re: unnecessary executor overheads around seqscans https://postgr.es/m/xzflwwjtwxin3dxziyblrnygy3gfygo5dsuw6ltcoha73ecmnf%40nh6nonzta7kw that are worth fixing independently of batching. Some of those fixes may be better pursued first, both because they benefit all scan paths and because they would make batching's gains more honest. Separately, after looking at the previous version, Andres pointed out offlist two fundamental issues with the patch's design: * The heapam implementation (in a version of the patch I didn't post to the thread) duplicated heap_prepare_pagescan() logic in a separate batch-specific code path, which is not acceptable as changes should benefit the existing slot interface too. Code duplication is not good either from a future maintainability aspect. The v5 version of that code is not great in that respect either; it instead duplicated heapggettup_pagemode() to slap batching on it. * Allocating executor_batch_rows slots on the executor side to receive rows from the AM adds significant overhead for slot initialization and management, and for non-row-organized AMs that do not produce individual rows at all, those slots would never be meaningfully populated. In any case, he just wasn't a fan of the slot-array approach the moment I mentioned it. The previous version had two slot arrays, inslots and outslots, of TTSOpsHeapTuple type (not TTSOpsBufferHeapTuple because buffer pins were managed by the batch code, which has its own modularity/correctness issues), populated via a materialize_all callback. A batch qual evaluator would copy qualifying tuples into outslots, with an activeslots pointer switching between the two depending on whether batch qual evaluation was used. The new design addresses both issues and differs from the previous version in several other ways: * Single slot instead of slot arrays: there is a single TupleTableSlot, reusing the scan node's ss_ScanTupleSlot whose type was already determined by the AM via table_slot_callbacks(). The slot is re-pointed to each HeapTuple in the current buffer page via a new repoint_slot AM callback, with no materialization or copying. Tuples are returned one by one from the executor's perspective, but the AM serves them in page-sized batches from pre-built HeapTupleData descriptors in rs_vistuples[], avoiding repeated descent into heapam per tuple. This is heapam's implementation of the batch interface; there is no intention to force other AMs into the same row-oriented model. * Batch qual evaluator not included: with the single-slot model, quals are evaluated per tuple via the existing ExecQual path after each repoint_slot call. A natural next step would be a new opcode (EEOP) that calls repoint_slot() internally within expression evaluation, allowing ExecQual to advance through multiple tuples from the same batch without returning to the scan node each time, with qual results accumulated in a bitmask in ExprState. The details of that will be worked out in a follow-on series. * heapgettup_pagemode_batch() gone: patch 0001 (described below) makes HeapScanDesc store full HeapTupleData entries in rs_vistuples[], which allows heap_getnextbatch() to simply advance a slice pointer into that array without any additional copying or re-entering heap code, making a separate batch-specific scan function unnecessary. * TupleBatch renamed to RowBatch: "row batch" is more natural terminology for this concept and also consistent with how similar abstractions are named in columnar and OLAP systems. * AM callbacks now take RowBatch directly: previously heap_getnextbatch() returned a void pointer that the executor would store into RowBatch.am_payload, because only the executor knew the internals of RowBatch. Now the AM receives RowBatch directly as a parameter and can populate it without the executor acting as an intermediary. This is also why RowBatch is introduced in its own patch ahead of the AM API addition, so the struct definition is available to both sides. Patch 0001 changes rs_vistuples[] to store full HeapTupleData entries instead of OffsetNumbers, as a standalone improvement to the existing pagemode scan path. Measured on a pg_prewarm'd (also vaccum freeze'd in the all-visible case) table with 1M/5M/10M rows: query all-visible not-all-visible count(*) -0.2% to +0.9% -0.4% to +0.5% count(*) WHERE id % 10 = 0 -1.1% to +3.4% +0.2% to +1.5% SELECT * LIMIT 1 OFFSET N -2.2% to -0.6% -0.9% to +6.6% SELECT * WHERE id%10=0 LIMIT -0.8% to +3.9% +0.9% to +9.6% No significant regression on either page type. The structural improvement is most visible on not-all-visible pages where HeapTupleSatisfiesMVCCBatch() already reads every tuple header during visibility checks, so persisting the result into rs_vistuples[] eliminates the downstream re-read (in heapgettupe_pagemode()) with no measurable overhead. That said, these numbers are somewhat noisy on my machine. Results on other machines would be welcome. Patches 0002-0005 add the RowBatch infrastructure, the batch AM API and heapam implementation including seqscan variants that use the new scan_getnextbatch() API, and EXPLAIN (ANALYZE, BATCHES) support, respectively. With batching enabled (executor_batch_rows=300, ~MaxHeapTuplesPerPage): query all-visible not-all-visible count(*) +11 to +15% +9 to +13% count(*) WHERE id % 10 = 0 +6 to +11% +10 to +14% SELECT * LIMIT 1 OFFSET N +16 to +19% +16 to +22% SELECT * WHERE id%10=0 LIMIT +8 to +10% +8 to +13% With executor_batch_rows=0, results are within noise of master across all query types and sizes, confirming no regression from the infrastructure changes themselves. The not-all-visible results tend to show slightly higher gains than the all-visible case. This is likely because the existing heapam code is more optimized for the all-visible path, so the not-all-visible path, which goes through HeapTupleSatisfiesMVCCBatch() for per-tuple visibility checks, has more headroom that batching can exploit. Setting aside the current series for a moment, there are some broader design questions worth raising while we have attention on this area. Some of these echo points Tomas raised in his first reply on this thread, and I am reiterating them deliberately since I have not managed to fully address them on my own or I simply didn't need to for the TAM-to-scan-node batching and think they would benefit from wider input rather than just my own iteration. We should also start thinking about other ways the executor can consume batch rows, not always assuming they are presented as HeapTupleData. For instance, an AM could expose decoded column arrays directly to operators that can consume them, bypassing slot-based deform entirely, or a columnar AM could implement scan_getnextbatch by decoding column strips directly into the batch without going through per-tuple HeapTupleData at all. Feedback on whether the current RowBatch design and the choices made in the scan_getnextbatch and RowBatchOps API make that sort of thing harder than it needs to be would be appreciated. For example, heapam's implementation of scan_getnextbatch uses a single TTSOpsBufferHeapTuple slot re-pointed to HeapTupleData entries one at a time via repoint_slot in RowBatchHeapOps. That works for heapam but a columnar AM could implement scan_getnextbatch to decode column strips directly into arrays in the batch, with no per-row repoint step needed at all. Any adjustments that would make RowBatch more AM-agnostic are worth discussing now before the design hardens. There are also broader open questions about how far the batch model can extend beyond the scan node. Qual pushdown into the AM has been discussed in nearby threads and would be one way to allow expression evaluation to happen before data reaches the executor proper, though that is a separate effort. For the purposes of this series, expression evaluation still happens in the executor after scan_getnextbatch returns. If the scan node does not project, the buffer heap slot is passed directly to the parent node, which calls slot callbacks to deform as needed. But once a node above projects, aggregates, or joins, the notion of a page-sized batch from a single AM loses its meaning and virtual slots take over. Whether RowBatch is usable or meaningful beyond the scan/TAM boundary in any form, and whether the core executor will ever have non-HeapTupleData batch consumption paths or leave that entirely to extensions, are open questions worth discussing. For RowBatch to eventually play the role that TupleTableSlot plays for row-at-a-time execution, something inside it would need to serve as the common currency for batch data, analogous to TupleTableSlot's datum/isnull arrays. Column arrays are the obvious direction, but even that leaves open the question of representation. PostgreSQL's Datum is a pointer-sized abstraction that boxes everything, whereas vectorized systems use typed packed arrays of native types with validity bitmasks, which is a significant part of why tight vectorized loops are fast there. Whether column arrays of Datum would be good enough, or whether going further toward typed packed arrays would be necessary to get meaningful vectorization, is a deeper design question that this series deliberately does not try to answer. Even though the focus is on getting batching working at the scan/TAM boundary first, thoughts on any of these points would be welcome. -- Thanks, Amit Langote
From a095d26e1b5a361a7d42300e5364da948496f2ba Mon Sep 17 00:00:00 2001 From: Amit Langote <[email protected]> Date: Mon, 23 Mar 2026 18:21:47 +0900 Subject: [PATCH v6 3/5] Add batch table AM API and heapam implementation Introduce table AM callbacks for batched tuple fetching: scan_begin_batch, scan_getnextbatch, scan_reset_batch, and scan_end_batch. AMs implement all four or none; checked by table_supports_batching(). scan_reset_batch releases held resources (e.g. buffer pins) without freeing, allowing reuse across rescans. Provide the heapam implementation. HeapPageBatch (stored in RowBatch.am_payload) is a thin slice descriptor over the scan's rs_vistuples[] array, which was introduced in the previous commit. Rather than owning a copy of tuple headers, HeapPageBatch holds a pointer into scan->rs_vistuples[] for the current slice and a buffer pin for the current page. heap_getnextbatch() calls heap_prepare_pagescan() to populate rs_vistuples[] for each new page, then re-points hb->tuples to the next slice of rs_vistuples[] on each call. If the page has more tuples than the executor's max_rows, subsequent calls return the next slice without re-entering page preparation. The buffer pin is held until the page is fully consumed. scan_begin_batch creates a single TupleTableSlot with TTSOpsBufferHeapTuple ops. heap_repoint_slot() re-points this slot to each tuple in turn via ExecStoreBufferHeapTuple(). Consumers that need to retain the slot across calls rely on the normal slot materialization contract. Reviewed-by: Daniil Davydov <[email protected]> Reviewed-by: ChangAo Chen <[email protected]> Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com --- src/backend/access/heap/heapam.c | 229 ++++++++++++++++++++++- src/backend/access/heap/heapam_handler.c | 8 +- src/include/access/heapam.h | 33 ++++ src/include/access/tableam.h | 136 ++++++++++++++ src/include/pgstat.h | 4 +- 5 files changed, 403 insertions(+), 7 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index c6d0aacc5c9..e70c0ccbe82 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -43,6 +43,7 @@ #include "catalog/pg_database.h" #include "catalog/pg_database_d.h" #include "commands/vacuum.h" +#include "executor/execRowBatch.h" #include "pgstat.h" #include "port/pg_bitutils.h" #include "storage/lmgr.h" @@ -109,6 +110,7 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); +static void heap_repoint_slot(RowBatch *b, int idx); /* @@ -1213,7 +1215,7 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_cbuf = InvalidBuffer; /* - * Disable page-at-a-time mode if it's not a MVCC-safe snapshot. + * Disable page-at-a-time mode if the snapshot does not allow it. */ if (!(snapshot && IsMVCCSnapshot(snapshot))) scan->rs_base.rs_flags &= ~SO_ALLOW_PAGEMODE; @@ -1463,7 +1465,7 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) * the proper return buffer and return the tuple. */ - pgstat_count_heap_getnext(scan->rs_base.rs_rd); + pgstat_count_heap_getnext(scan->rs_base.rs_rd, 1); return &scan->rs_ctup; } @@ -1491,13 +1493,232 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s * the proper return buffer and return the tuple. */ - pgstat_count_heap_getnext(scan->rs_base.rs_rd); + pgstat_count_heap_getnext(scan->rs_base.rs_rd, 1); ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, scan->rs_cbuf); return true; } +/*---------- Batching support -----------*/ + +static const RowBatchOps RowBatchHeapOps = +{ + .repoint_slot = heap_repoint_slot +}; + +/* + * heap_batch_feasible + * Batching requires a MVCC snapshot since it relies on + * page-at-a-time mode, which heap_beginscan() disables for + * non-MVCC snapshots. + */ +bool +heap_batch_feasible(Relation relation, Snapshot snapshot) +{ + return snapshot && IsMVCCSnapshot(snapshot); +} + +/* + * heap_begin_batch + * Initialize AM-side batch state for a heap scan. + * + * Allocates a HeapPageBatch, which acts as a thin slice descriptor over + * the scan's rs_vistuples[] array. Unlike the previous version there is + * no separate tuple header storage in HeapPageBatch itself; rs_vistuples[] + * in HeapScanDescData (populated by page_collect_tuples() via + * heap_prepare_pagescan()) serves as the page-level buffer. HeapPageBatch + * holds a pointer into that array for the current slice and the buffer pin + * for the current page. + * + * b->slot must be a TTSOpsBufferHeapTuple slot. + */ +void +heap_begin_batch(TableScanDesc sscan, RowBatch *b) +{ + HeapPageBatch *hb; + + /* Batch path relies on executor-level qual eval, not AM scan keys */ + Assert(sscan->rs_nkeys == 0); + Assert(TTS_IS_BUFFERTUPLE(b->slot)); + + hb = palloc(sizeof(HeapPageBatch)); + hb->tuples = NULL; + hb->ntuples = 0; + hb->nextitem = 0; + hb->buf = InvalidBuffer; + + b->am_payload = hb; + b->ops = &RowBatchHeapOps; +} + +/* + * heap_reset_batch + * Release pin and reset for rescan, keeping allocations. + */ +void +heap_reset_batch(TableScanDesc sscan, RowBatch *b) +{ + HeapPageBatch *hb = (HeapPageBatch *) b->am_payload; + + Assert(hb != NULL); + if (BufferIsValid(hb->buf)) + { + ReleaseBuffer(hb->buf); + hb->buf = InvalidBuffer; + } + hb->ntuples = 0; + hb->nextitem = 0; +} + +/* + * heap_end_batch + * Release all batch resources. + */ +void +heap_end_batch(TableScanDesc sscan, RowBatch *b) +{ + HeapPageBatch *hb = (HeapPageBatch *) b->am_payload; + + if (BufferIsValid(hb->buf)) + ReleaseBuffer(hb->buf); + + pfree(hb); + b->am_payload = NULL; +} + +/* + * heap_getnextbatch + * Fetch the next slice of visible tuples from a heap scan. + * + * Serves slices from the current page's rs_vistuples[] array. If the + * current page has remaining tuples, sets hb->tuples to point at the next + * slice without re-entering the page scan. If the page is exhausted, + * advances to the next page via heap_fetch_next_buffer(), prepares it + * with heap_prepare_pagescan(), and serves the first slice from it. + * + * hb->tuples points directly into scan->rs_vistuples[]; the entries remain + * valid as long as hb->buf (the page's buffer pin) is held. The pin is + * released at the top of the next call once the page is fully consumed. + * + * Each call returns at most b->max_rows tuples. + * + * Returns true if tuples were fetched, false at end of scan. + */ +bool +heap_getnextbatch(TableScanDesc sscan, RowBatch *b, ScanDirection dir) +{ + HeapScanDesc scan = (HeapScanDesc) sscan; + HeapPageBatch *hb = (HeapPageBatch *) b->am_payload; + int remaining; + int nserve; + + Assert(ScanDirectionIsForward(dir)); + Assert(sscan->rs_flags & SO_ALLOW_PAGEMODE); + + /* + * Try to serve from the current page first. No page advance, no buffer + * management, no re-entry into heap code. + */ + remaining = scan->rs_ntuples - hb->nextitem; + if (remaining > 0) + { + nserve = Min(remaining, b->max_rows); + + hb->tuples = &scan->rs_vistuples[hb->nextitem]; + hb->ntuples = nserve; + hb->nextitem += nserve; + + b->nrows = nserve; + b->pos = 0; + + pgstat_count_heap_getnext(sscan->rs_rd, nserve); + return true; + } + + /* + * Current page exhausted. Advance to the next page with visible tuples. + */ + for (;;) + { + /* + * Release the previous page's pin. The page is fully consumed at + * this point -- all slices have been served. + */ + if (BufferIsValid(hb->buf)) + { + ReleaseBuffer(hb->buf); + hb->buf = InvalidBuffer; + } + + heap_fetch_next_buffer(scan, dir); + + if (!BufferIsValid(scan->rs_cbuf)) + { + /* End of scan */ + scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; + scan->rs_inited = false; + b->nrows = 0; + return false; + } + + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); + + /* + * Prepare the page: prune, run visibility checks, and populate + * scan->rs_vistuples[0..rs_ntuples-1] via page_collect_tuples(). + */ + heap_prepare_pagescan(sscan); + + if (scan->rs_ntuples > 0) + { + /* + * Pin the page so tuple data stays valid while the executor + * processes slices. Released at the top of the next call + * once the page is fully consumed. + */ + IncrBufferRefCount(scan->rs_cbuf); + hb->buf = scan->rs_cbuf; + + nserve = Min(scan->rs_ntuples, b->max_rows); + + hb->tuples = &scan->rs_vistuples[0]; + hb->ntuples = nserve; + hb->nextitem = nserve; + + b->nrows = nserve; + b->pos = 0; + + pgstat_count_heap_getnext(sscan->rs_rd, nserve); + return true; + } + + /* Empty page (all dead/invisible tuples), try next */ + } +} + +/* + * heap_repoint_slot + * Re-point the batch's single slot to the tuple at index idx. + * + * Called by RowBatchGetNextSlot() for each tuple served to the parent + * node. hb->tuples[idx] was populated by page_collect_tuples() via + * heap_prepare_pagescan() and remains valid as long as hb->buf is pinned. + */ +static void +heap_repoint_slot(RowBatch *b, int idx) +{ + HeapPageBatch *hb = (HeapPageBatch *) b->am_payload; + + Assert(idx >= 0 && idx < hb->ntuples); + Assert(TTS_IS_BUFFERTUPLE(b->slot)); + + ExecStoreBufferHeapTuple(&hb->tuples[idx], b->slot, hb->buf); +} + +/*----- End of batching support -----*/ + void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, ItemPointer maxtid) @@ -1639,7 +1860,7 @@ heap_getnextslot_tidrange(TableScanDesc sscan, ScanDirection direction, * if we get here it means we have a new current scan tuple, so point to * the proper return buffer and return the tuple. */ - pgstat_count_heap_getnext(scan->rs_base.rs_rd); + pgstat_count_heap_getnext(scan->rs_base.rs_rd, 1); ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, scan->rs_cbuf); return true; diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 2fd120028bb..8124d573ac3 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2348,7 +2348,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, ExecStoreBufferHeapTuple(tuple, slot, hscan->rs_cbuf); /* Count successfully-fetched tuples as heap fetches */ - pgstat_count_heap_getnext(scan->rs_rd); + pgstat_count_heap_getnext(scan->rs_rd, 1); return true; } @@ -2637,6 +2637,12 @@ static const TableAmRoutine heapam_methods = { .scan_rescan = heap_rescan, .scan_getnextslot = heap_getnextslot, + .scan_batch_feasible = heap_batch_feasible, + .scan_begin_batch = heap_begin_batch, + .scan_getnextbatch = heap_getnextbatch, + .scan_end_batch = heap_end_batch, + .scan_reset_batch = heap_reset_batch, + .scan_set_tidrange = heap_set_tidrange, .scan_getnextslot_tidrange = heap_getnextslot_tidrange, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 09b9566d0ac..0783fa13c4c 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -107,6 +107,32 @@ typedef struct HeapScanDescData } HeapScanDescData; typedef struct HeapScanDescData *HeapScanDesc; +/* + * HeapPageBatch -- heapam-private page-level batch state. + * + * Thin slice descriptor over the scan's rs_vistuples[] array. Rather + * than owning a copy of tuple headers, HeapPageBatch holds a pointer + * into scan->rs_vistuples[] for the current slice, which was populated + * by page_collect_tuples() during heap_prepare_pagescan(). + * + * The executor consumes tuples in slices. Each heap_getnextbatch call + * re-points tuples to the next slice and advances nextitem, serving up + * to RowBatch.max_rows tuples from the current page before advancing + * to the next. + * + * buf holds the pin for the current page. tuple data referenced via + * tuples remains valid as long as buf is pinned. + * + * Stored in RowBatch.am_payload. + */ +typedef struct HeapPageBatch +{ + HeapTupleData *tuples; /* points into scan->rs_vistuples[nextitem] */ + int ntuples; /* tuples in current slice */ + int nextitem; /* next unserved tuple index in rs_vistuples[] */ + Buffer buf; /* pinned buffer for current page */ +} HeapPageBatch; + typedef struct BitmapHeapScanDescData { HeapScanDescData rs_heap_base; @@ -362,6 +388,13 @@ extern void heap_endscan(TableScanDesc sscan); extern HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction); extern bool heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot); + +extern bool heap_batch_feasible(Relation relation, Snapshot snapshot); +extern void heap_begin_batch(TableScanDesc sscan, RowBatch *batch); +extern bool heap_getnextbatch(TableScanDesc sscan, RowBatch *batch, ScanDirection dir); +extern void heap_end_batch(TableScanDesc sscan, RowBatch *batch); +extern void heap_reset_batch(TableScanDesc sscan, RowBatch *batch); + extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, ItemPointer maxtid); extern bool heap_getnextslot_tidrange(TableScanDesc sscan, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 06084752245..a72be111c26 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -275,6 +275,8 @@ typedef void (*IndexBuildCallback) (Relation index, bool tupleIsAlive, void *state); +typedef struct RowBatch RowBatch; + /* * API struct for a table AM. Note this must be allocated in a * server-lifetime manner, typically as a static const struct, which then gets @@ -351,6 +353,56 @@ typedef struct TableAmRoutine ScanDirection direction, TupleTableSlot *slot); + /* ------------------------------------------------------------------------ + * Batched scan support + * ------------------------------------------------------------------------ + */ + + /* + * Returns true if the AM can support batching for a scan with the + * given snapshot. Called at plan init time before the scan descriptor + * exists. AMs that have no snapshot-based restrictions can omit this + * callback, in which case batching is considered feasible. + */ + bool (*scan_batch_feasible)(Relation relation, Snapshot snapshot); + + /* + * Initialize AM-owned batch state for a scan. Called once before + * the first scan_getnextbatch call. The AM allocates whatever + * private state it needs and stores it in b->am_payload. b->slot + * is the scan node's ss_ScanTupleSlot, whose type was already + * determined by the AM via table_slot_callbacks(). The AM's + * repoint_slot callback re-points it to each tuple in the batch + * in turn. Future interfaces may allow the AM to expose batch + * data in other forms without going through a slot. + */ + void (*scan_begin_batch)(TableScanDesc sscan, RowBatch *b); + + /* + * Fetch the next batch of tuples from the scan into b. Sets b->nrows + * to the number of tuples available and resets b->pos to 0. Returns + * true if any tuples were fetched, false at end of scan. The caller + * advances through the batch via RowBatchGetNextSlot(), which calls + * ops->repoint_slot for each position up to b->nrows. + */ + bool (*scan_getnextbatch)(TableScanDesc sscan, RowBatch *b, + ScanDirection dir); + + /* + * Release all AM-owned batch resources, including any buffer pins + * held in am_payload. Called when the scan node is shut down. + * After this call b->am_payload must not be used. + */ + void (*scan_end_batch)(TableScanDesc sscan, RowBatch *b); + + /* + * Reset batch state for rescan. Release any held resources (e.g. + * buffer pins) and reset counts, but keep the allocation so the + * next getnextbatch call can reuse it without re-entering + * begin_batch. + */ + void (*scan_reset_batch)(TableScanDesc sscan, RowBatch *b); + /*----------- * Optional functions to provide scanning for ranges of ItemPointers. * Implementations must either provide both of these functions, or neither @@ -1047,6 +1099,90 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot); } +/* + * table_supports_batching + * Does the relation's AM support batching? + */ +static inline bool +table_supports_batching(Relation relation, Snapshot snapshot) +{ + const TableAmRoutine *tam = relation->rd_tableam; + + if (tam->scan_getnextbatch == NULL) + return false; + + Assert(tam->scan_begin_batch != NULL); + Assert(tam->scan_reset_batch != NULL); + Assert(tam->scan_end_batch != NULL); + + /* + * Optional: AM may restrict batching based on snapshot or other conditions. + */ + if (tam->scan_batch_feasible != NULL && + !tam->scan_batch_feasible(relation, snapshot)) + return false; + + return true; +} + +/* + * table_scan_begin_batch + * Allocate AM-owned batch payload in the RowBatch + */ +static inline void +table_scan_begin_batch(TableScanDesc sscan, RowBatch *b) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(tam->scan_begin_batch != NULL); + + return tam->scan_begin_batch(sscan, b); +} + +/* + * table_scan_getnextbatch + * Fetch the next batch of tuples from the AM. Returns true if tuples + * were fetched, false at end of scan. Only forward scans are supported. + */ +static inline bool +table_scan_getnextbatch(TableScanDesc sscan, RowBatch *b, ScanDirection dir) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(ScanDirectionIsForward(dir)); + Assert(tam->scan_getnextbatch != NULL); + + return tam->scan_getnextbatch(sscan, b, dir); +} + +/* + * table_scan_end_batch + * Release AM-owned resources for the batch payload. + */ +static inline void +table_scan_end_batch(TableScanDesc sscan, RowBatch *b) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(tam->scan_end_batch != NULL); + + tam->scan_end_batch(sscan, b); +} + +/* + * table_scan_reset_batch + * Reset AM-owned batch state for rescan without freeing. + */ +static inline void +table_scan_reset_batch(TableScanDesc sscan, RowBatch *b) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(tam->scan_reset_batch != NULL); + + tam->scan_reset_batch(sscan, b); +} + /* ---------------------------------------------------------------------------- * TID Range scanning related functions. * ---------------------------------------------------------------------------- diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 216b93492ba..0344c4e88c3 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -695,10 +695,10 @@ extern void pgstat_report_analyze(Relation rel, if (pgstat_should_count_relation(rel)) \ (rel)->pgstat_info->counts.numscans++; \ } while (0) -#define pgstat_count_heap_getnext(rel) \ +#define pgstat_count_heap_getnext(rel, n) \ do { \ if (pgstat_should_count_relation(rel)) \ - (rel)->pgstat_info->counts.tuples_returned++; \ + (rel)->pgstat_info->counts.tuples_returned += (n); \ } while (0) #define pgstat_count_heap_fetch(rel) \ do { \ -- 2.47.3
From d7e8f76144cb27e761e2d4bc9c687dd0a2de203e Mon Sep 17 00:00:00 2001 From: Amit Langote <[email protected]> Date: Thu, 12 Mar 2026 09:18:04 +0900 Subject: [PATCH v6 1/5] heapam: store full HeapTupleData in rs_vistuples[] for pagemode scans page_collect_tuples() builds full HeapTupleData headers for every visible tuple on a page -- t_data, t_len, t_self, t_tableOid -- but previously discarded them immediately after writing just the OffsetNumber of each survivor into rs_vistuples[]. heapgettup_pagemode() then re-derived those same values on every call from the saved OffsetNumber via PageGetItemId() and PageGetItem(). Change rs_vistuples[] element type from OffsetNumber to HeapTupleData and populate it inside page_collect_tuples() while lpp, lineoff, page, block, and relid are already in scope, so no additional page reads are needed. For the all_visible path (the common case on a primary not under active modification) the write piggy-backs on the existing per-lineoff loop. For the !all_visible path, HeapTupleData entries are written during the visibility loop and compacted to visible survivors afterwards using batchmvcc.visible[], avoiding a return to pd_linp[] via PageGetItemId(). With rs_vistuples[] populated, heapgettup_pagemode() replaces the per-tuple PageGetItemId/PageGetItem calls with a single struct copy: *tuple = scan->rs_vistuples[lineindex]; The stack-local HeapTupleData array in BatchMVCCState is eliminated by passing rs_vistuples[] directly to HeapTupleSatisfiesMVCCBatch(), saving MaxHeapTuplesPerPage * 24 bytes of stack per page_collect_tuples() call. HeapTupleSatisfiesMVCCBatch() loses its vistuples_dense parameter since compaction is now handled by the caller. t_tableOid is pre-initialized for all rs_vistuples[] entries at scan start in heap_beginscan(), eliminating a store per visible tuple from the fill loop. The raw ItemId word is read once per tuple with lp_off and lp_len extracted via mask and shift rather than calling ItemIdGetOffset() and ItemIdGetLength() separately, avoiding a potential second load from the same address in the inner loop. Having pre-built HeapTupleData headers available at the scan descriptor level also lays groundwork for a batched tuple interface, where an AM can serve multiple tuples per call without repeating the line pointer traversal. Suggested-by: Andres Freund <[email protected]> --- src/backend/access/heap/heapam.c | 73 ++++++++++++--------- src/backend/access/heap/heapam_handler.c | 19 ++---- src/backend/access/heap/heapam_visibility.c | 21 +++--- src/include/access/heapam.h | 5 +- 4 files changed, 58 insertions(+), 60 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e5bd062de77..c6d0aacc5c9 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -524,7 +524,6 @@ page_collect_tuples(HeapScanDesc scan, Snapshot snapshot, BlockNumber block, int lines, bool all_visible, bool check_serializable) { - Oid relid = RelationGetRelid(scan->rs_base.rs_rd); int ntup = 0; int nvis = 0; BatchMVCCState batchmvcc; @@ -536,7 +535,7 @@ page_collect_tuples(HeapScanDesc scan, Snapshot snapshot, for (OffsetNumber lineoff = FirstOffsetNumber; lineoff <= lines; lineoff++) { ItemId lpp = PageGetItemId(page, lineoff); - HeapTuple tup; + HeapTuple tup = &scan->rs_vistuples[ntup]; if (unlikely(!ItemIdIsNormal(lpp))) continue; @@ -549,25 +548,33 @@ page_collect_tuples(HeapScanDesc scan, Snapshot snapshot, */ if (!all_visible || check_serializable) { - tup = &batchmvcc.tuples[ntup]; + uint32 lp_val = *(uint32 *) lpp; - tup->t_data = (HeapTupleHeader) PageGetItem(page, lpp); - tup->t_len = ItemIdGetLength(lpp); - tup->t_tableOid = relid; + tup->t_data = (HeapTupleHeader) ((char *) page + (lp_val & 0x7fff)); + tup->t_len = lp_val >> 17; + Assert(tup->t_tableOid == RelationGetRelid(scan->rs_base.rs_rd)); ItemPointerSet(&(tup->t_self), block, lineoff); } - /* - * If the page is all visible, these fields otherwise won't be - * populated in loop below. - */ if (all_visible) { if (check_serializable) - { batchmvcc.visible[ntup] = true; + + /* + * In the all_visible && !check_serializable path, the block + * above was skipped, so tup's fields have not been set yet. + * Fill them here while lpp is still in hand. + */ + if (!check_serializable) + { + uint32 lp_val = *(uint32 *) lpp; + + tup->t_data = (HeapTupleHeader) ((char *) page + (lp_val & 0x7fff)); + tup->t_len = lp_val >> 17; + Assert(tup->t_tableOid == RelationGetRelid(scan->rs_base.rs_rd)); + ItemPointerSet(&tup->t_self, block, lineoff); } - scan->rs_vistuples[ntup] = lineoff; } ntup++; @@ -598,11 +605,24 @@ page_collect_tuples(HeapScanDesc scan, Snapshot snapshot, { HeapCheckForSerializableConflictOut(batchmvcc.visible[i], scan->rs_base.rs_rd, - &batchmvcc.tuples[i], + &scan->rs_vistuples[i], buffer, snapshot); } } + + /* Now compact rs_vistuples[] to visible survivors only */ + if (!all_visible) + { + int dst = 0; + for (int i = 0; i < ntup; i++) + { + if (batchmvcc.visible[i]) + scan->rs_vistuples[dst++] = scan->rs_vistuples[i]; + } + Assert(dst == nvis); + } + return nvis; } @@ -1073,14 +1093,13 @@ heapgettup_pagemode(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - Page page; uint32 lineindex; uint32 linesleft; if (likely(scan->rs_inited)) { /* continue from previously returned page/tuple */ - page = BufferGetPage(scan->rs_cbuf); + Assert(BufferIsValid(scan->rs_cbuf)); lineindex = scan->rs_cindex + dir; if (ScanDirectionIsForward(dir)) @@ -1108,29 +1127,21 @@ heapgettup_pagemode(HeapScanDesc scan, /* prune the page and determine visible tuple offsets */ heap_prepare_pagescan((TableScanDesc) scan); - page = BufferGetPage(scan->rs_cbuf); linesleft = scan->rs_ntuples; lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1; - /* block is the same for all tuples, set it once outside the loop */ - ItemPointerSetBlockNumber(&tuple->t_self, scan->rs_cblock); - /* lineindex now references the next or previous visible tid */ continue_page: for (; linesleft > 0; linesleft--, lineindex += dir) { - ItemId lpp; - OffsetNumber lineoff; - - Assert(lineindex < scan->rs_ntuples); - lineoff = scan->rs_vistuples[lineindex]; - lpp = PageGetItemId(page, lineoff); - Assert(ItemIdIsNormal(lpp)); - - tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); - tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSetOffsetNumber(&tuple->t_self, lineoff); + /* + * Headers were pre-built by page_collect_tuples() into + * rs_vistuples[]. Copy the entry; t_data still points into the + * pinned page, which is safe for the lifetime of the current page + * scan. + */ + *tuple = scan->rs_vistuples[lineindex]; /* skip any tuples that don't match the scan key */ if (key != NULL && @@ -1244,6 +1255,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, /* we only need to set this up once */ scan->rs_ctup.t_tableOid = RelationGetRelid(relation); + for (int i = 0; i < MaxHeapTuplesPerPage; i++) + scan->rs_vistuples[i].t_tableOid = RelationGetRelid(relation); /* * Allocate memory to keep track of page allocation for parallel workers diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 253a735b6c1..2fd120028bb 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2153,9 +2153,6 @@ heapam_scan_bitmap_next_tuple(TableScanDesc scan, { BitmapHeapScanDesc bscan = (BitmapHeapScanDesc) scan; HeapScanDesc hscan = (HeapScanDesc) bscan; - OffsetNumber targoffset; - Page page; - ItemId lp; /* * Out of range? If so, nothing more to look at on this page @@ -2170,15 +2167,7 @@ heapam_scan_bitmap_next_tuple(TableScanDesc scan, return false; } - targoffset = hscan->rs_vistuples[hscan->rs_cindex]; - page = BufferGetPage(hscan->rs_cbuf); - lp = PageGetItemId(page, targoffset); - Assert(ItemIdIsNormal(lp)); - - hscan->rs_ctup.t_data = (HeapTupleHeader) PageGetItem(page, lp); - hscan->rs_ctup.t_len = ItemIdGetLength(lp); - hscan->rs_ctup.t_tableOid = scan->rs_rd->rd_id; - ItemPointerSet(&hscan->rs_ctup.t_self, hscan->rs_cblock, targoffset); + hscan->rs_ctup = hscan->rs_vistuples[hscan->rs_cindex]; pgstat_count_heap_fetch(scan->rs_rd); @@ -2456,7 +2445,7 @@ SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, while (start < end) { uint32 mid = start + (end - start) / 2; - OffsetNumber curoffset = hscan->rs_vistuples[mid]; + OffsetNumber curoffset = hscan->rs_vistuples[mid].t_self.ip_posid; if (tupoffset == curoffset) return true; @@ -2575,7 +2564,7 @@ BitmapHeapScanNextBlock(TableScanDesc scan, ItemPointerSet(&tid, block, offnum); if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot, &heapTuple, NULL, true)) - hscan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid); + hscan->rs_vistuples[ntup++] = heapTuple; } } else @@ -2604,7 +2593,7 @@ BitmapHeapScanNextBlock(TableScanDesc scan, valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); if (valid) { - hscan->rs_vistuples[ntup++] = offnum; + hscan->rs_vistuples[ntup++] = loctup; PredicateLockTID(scan->rs_rd, &loctup.t_self, snapshot, HeapTupleHeaderGetXmin(loctup.t_data)); } diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index fc64f4343ce..cd6cd4d8d69 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -1670,16 +1670,16 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, } /* - * Perform HeaptupleSatisfiesMVCC() on each passed in tuple. This is more + * Perform HeapTupleSatisfiesMVCC() on each passed in tuple. This is more * efficient than doing HeapTupleSatisfiesMVCC() one-by-one. * - * To be checked tuples are passed via BatchMVCCState->tuples. Each tuple's - * visibility is stored in batchmvcc->visible[]. In addition, - * ->vistuples_dense is set to contain the offsets of visible tuples. + * Each tuple's visibility is stored in batchmvcc->visible[]. The caller + * is responsible for compacting the tuples array to contain only visible + * survivors after this function returns. * - * The reason this is more efficient than HeapTupleSatisfiesMVCC() is that it - * avoids a cross-translation-unit function call for each tuple, allows the - * compiler to optimize across calls to HeapTupleSatisfiesMVCC and allows + * The reason this is more efficient than HeapTupleSatisfiesMVCC() is that + * it avoids a cross-translation-unit function call for each tuple, allows + * the compiler to optimize across calls to HeapTupleSatisfiesMVCC and allows * setting hint bits more efficiently (see the one BufferFinishSetHintBits() * call below). * @@ -1689,7 +1689,7 @@ int HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer, int ntups, BatchMVCCState *batchmvcc, - OffsetNumber *vistuples_dense) + HeapTupleData *tuples) { int nvis = 0; SetHintBitsState state = SHB_INITIAL; @@ -1699,16 +1699,13 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer, for (int i = 0; i < ntups; i++) { bool valid; - HeapTuple tup = &batchmvcc->tuples[i]; + HeapTuple tup = &tuples[i]; valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer, &state); batchmvcc->visible[i] = valid; if (likely(valid)) - { - vistuples_dense[nvis] = tup->t_self.ip_posid; nvis++; - } } if (state == SHB_ENABLED) diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 2fdc50b865b..09b9566d0ac 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -103,7 +103,7 @@ typedef struct HeapScanDescData /* these fields only used in page-at-a-time mode and for bitmap scans */ uint32 rs_cindex; /* current tuple's index in vistuples */ uint32 rs_ntuples; /* number of visible tuples on page */ - OffsetNumber rs_vistuples[MaxHeapTuplesPerPage]; /* their offsets */ + HeapTupleData rs_vistuples[MaxHeapTuplesPerPage]; /* tuples */ } HeapScanDescData; typedef struct HeapScanDescData *HeapScanDesc; @@ -483,14 +483,13 @@ extern bool HeapTupleIsSurelyDead(HeapTuple htup, */ typedef struct BatchMVCCState { - HeapTupleData tuples[MaxHeapTuplesPerPage]; bool visible[MaxHeapTuplesPerPage]; } BatchMVCCState; extern int HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer, int ntups, BatchMVCCState *batchmvcc, - OffsetNumber *vistuples_dense); + HeapTupleData *tuples); /* * To avoid leaking too much knowledge about reorderbuffer implementation -- 2.47.3
From 0d810ceed77e394883ab0e95eafe36051b546040 Mon Sep 17 00:00:00 2001 From: Amit Langote <[email protected]> Date: Thu, 5 Mar 2026 17:42:19 +0900 Subject: [PATCH v6 2/5] Add RowBatch infrastructure for batched tuple processing Introduce RowBatch, a data carrier that allows table AMs to deliver multiple rows per call and the executor to process them as a group. RowBatch separates three concerns: - am_payload: opaque, AM-owned storage (e.g. HeapBatch with pinned page and tuple headers). The AM allocates this in its scan_begin_batch callback. - slots[]: TupleTableSlot array, created by RowBatchCreateSlots() with AM-appropriate slot ops. Populated from am_payload by ops->materialize_into_slots when the executor needs tuple data. - max_rows: executor-set upper bound that the AM respects when filling a batch. RowBatch does not own selection/filtering state. Which rows survive qual evaluation is the executor's concern, tracked separately in scan node state. This keeps RowBatch focused on the AM-to-executor data transfer boundary. RowBatchOps provides a vtable for AM-specific operations; currently only materialize_into_slots is defined. --- src/backend/executor/Makefile | 1 + src/backend/executor/execRowBatch.c | 54 ++++++++++++++++++ src/backend/executor/meson.build | 1 + src/include/executor/execRowBatch.h | 88 +++++++++++++++++++++++++++++ src/tools/pgindent/typedefs.list | 2 + 5 files changed, 146 insertions(+) create mode 100644 src/backend/executor/execRowBatch.c create mode 100644 src/include/executor/execRowBatch.h diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 11118d0ce02..99a00e762f6 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ execAsync.o \ + execRowBatch.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execRowBatch.c b/src/backend/executor/execRowBatch.c new file mode 100644 index 00000000000..6a298813bd8 --- /dev/null +++ b/src/backend/executor/execRowBatch.c @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * execRowBatch.c + * Helpers for RowBatch + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execRowBatch.c + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execRowBatch.h" + +/* + * RowBatchCreate + * Allocate and initialize a new RowBatch envelope. + */ +RowBatch * +RowBatchCreate(int max_rows) +{ + RowBatch *b; + + Assert(max_rows > 0); + + b = palloc(sizeof(RowBatch)); + b->am_payload = NULL; + b->ops = NULL; + b->max_rows = max_rows; + b->nrows = 0; + b->pos = 0; + b->materialized = false; + b->slot = NULL; + + return b; +} + +/* + * RowBatchReset + * Reset an existing RowBatch envelope to empty. + */ +void +RowBatchReset(RowBatch *b, bool drop_slots) +{ + Assert(b != NULL); + + b->nrows = 0; + b->pos = 0; + b->materialized = false; + /* b->slot belongs to the owning PlanState node */ +} diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build index dc45be0b2ce..fd0bf80bacd 100644 --- a/src/backend/executor/meson.build +++ b/src/backend/executor/meson.build @@ -3,6 +3,7 @@ backend_sources += files( 'execAmi.c', 'execAsync.c', + 'execRowBatch.c', 'execCurrent.c', 'execExpr.c', 'execExprInterp.c', diff --git a/src/include/executor/execRowBatch.h b/src/include/executor/execRowBatch.h new file mode 100644 index 00000000000..021fdeecc73 --- /dev/null +++ b/src/include/executor/execRowBatch.h @@ -0,0 +1,88 @@ +/*------------------------------------------------------------------------- + * + * execRowBatch.h + * Executor batch envelope for passing row batch state upward + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execRowBatch.h + *------------------------------------------------------------------------- + */ +#ifndef EXECROWBATCH_H +#define EXECROWBATCH_H + +#include "executor/tuptable.h" + +typedef struct RowBatchOps RowBatchOps; + +/* + * RowBatch + * + * Data carrier from table AM to executor. The AM populates am_payload + * and nrows via scan_getnextbatch(). The executor calls ops->materialize_all + * to populate slots[] when it needs tuple data. + * + * Selection state (which rows survived qual eval) is owned by the executor, + * not the batch. + */ +typedef struct RowBatch +{ + void *am_payload; + const RowBatchOps *ops; + + int max_rows; /* executor-set upper bound */ + int nrows; /* rows TAM put in */ + int pos; /* iteration position */ + bool materialized; /* tuples in slots valid? */ + + TupleTableSlot *slot; /* row view */ +} RowBatch; + +/* + * RowBatchOps -- AM-specific operations on a RowBatch. + * + * Table AMs set b->ops during scan_begin_batch to provide + * callbacks that the executor uses to access batch contents. + * + * repoint_slot re-points the batch's single slot to the tuple at + * index idx within the current batch. The slot remains valid until + * the next call or until the batch is exhausted. + * + * Additional callbacks can be added here as new AMs or executor + * features require them. + */ +typedef struct RowBatchOps +{ + void (*repoint_slot) (RowBatch *b, int idx); +} RowBatchOps; + +/* Create/teardown */ +extern RowBatch *RowBatchCreate(int max_rows); +extern void RowBatchReset(RowBatch *b, bool drop_slots); + +/* Validation */ +static inline bool +RowBatchIsValid(RowBatch *b) +{ + return b != NULL && b->max_rows > 0; +} + +/* Iteration over materialized slots */ +static inline bool +RowBatchHasMore(RowBatch *b) +{ + return b->pos < b->nrows; +} + +static inline TupleTableSlot * +RowBatchGetNextSlot(RowBatch *b) +{ + if (b->pos >= b->nrows) + return NULL; + b->ops->repoint_slot(b, b->pos++); + return b->slot; +} + +#endif /* EXECROWBATCH_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 52f8603a7be..a2b0b1d99d4 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2663,6 +2663,8 @@ RoleSpec RoleSpecType RoleStmtType RollupData +RowBatch +RowBatchOps RowCompareExpr RowExpr RowIdentityVarInfo -- 2.47.3
From c5f58f57cda191408855ab243c05f15580ca5eef Mon Sep 17 00:00:00 2001 From: Amit Langote <[email protected]> Date: Sat, 20 Dec 2025 23:09:37 +0900 Subject: [PATCH v6 5/5] Add EXPLAIN (BATCHES) option for tuple batching statistics Add a BATCHES option to EXPLAIN that reports per-node batch statistics when a node uses batch mode execution. For nodes that support batching (currently SeqScan), this shows the number of batches fetched along with average, minimum, and maximum rows per batch. Output is supported in both text and non-text formats. Add regression tests covering text output, JSON format, filtered scans, LIMIT, and disabled batching. Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com --- src/backend/commands/explain.c | 44 +++++++++++ src/backend/commands/explain_state.c | 8 ++ src/backend/executor/execRowBatch.c | 44 ++++++++++- src/backend/executor/nodeSeqscan.c | 8 +- src/include/commands/explain_state.h | 1 + src/include/executor/execRowBatch.h | 22 +++++- src/include/executor/instrument.h | 1 + src/test/regress/expected/explain.out | 107 ++++++++++++++++++++++++++ src/test/regress/sql/explain.sql | 59 ++++++++++++++ 9 files changed, 291 insertions(+), 3 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 296ea8a1ed2..b507fec0dab 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -22,6 +22,7 @@ #include "commands/explain_format.h" #include "commands/explain_state.h" #include "commands/prepare.h" +#include "executor/execRowBatch.h" #include "foreign/fdwapi.h" #include "jit/jit.h" #include "libpq/pqformat.h" @@ -519,6 +520,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, instrument_option |= INSTRUMENT_BUFFERS; if (es->wal) instrument_option |= INSTRUMENT_WAL; + if (es->batches) + instrument_option |= INSTRUMENT_BATCHES; /* * We always collect timing for the entire statement, even when node-level @@ -1372,6 +1375,7 @@ ExplainNode(PlanState *planstate, List *ancestors, int save_indent = es->indent; bool haschildren; bool isdisabled; + RowBatch *batch = NULL; /* * Prepare per-worker output buffers, if needed. We'll append the data in @@ -2297,6 +2301,46 @@ ExplainNode(PlanState *planstate, List *ancestors, if (es->wal && planstate->instrument) show_wal_usage(es, &planstate->instrument->walusage); + /* BATCHES */ + switch (nodeTag(plan)) + { + case T_SeqScan: + batch = castNode(SeqScanState, planstate)->batch; + break; + default: + break; + } + + if (es->batches && batch) + { + RowBatchStats *stats = batch->stats; + + Assert(stats); + if (stats->batches > 0) + { + if (es->format == EXPLAIN_FORMAT_TEXT) + { + ExplainIndentText(es); + appendStringInfo(es->str, + "Batches: %lld Avg Rows: %.1f Max: %d Min: %d\n", + (long long) stats->batches, + RowBatchAvgRows(batch), stats->max_rows, + stats->min_rows == INT_MAX ? 0 : + stats->min_rows); + } + else + { + ExplainPropertyInteger("Batches", NULL, stats->batches, es); + ExplainPropertyFloat("Average Batch Rows", NULL, + RowBatchAvgRows(batch), 1, es); + ExplainPropertyInteger("Max Batch Rows", NULL, stats->max_rows, es); + ExplainPropertyInteger("Min Batch Rows", NULL, + stats->min_rows == INT_MAX ? 0 : + stats->min_rows, es); + } + } + } + /* Prepare per-worker buffer/WAL usage */ if (es->workers_state && (es->buffers || es->wal) && es->verbose) { diff --git a/src/backend/commands/explain_state.c b/src/backend/commands/explain_state.c index 77f59b8e500..28022a171cd 100644 --- a/src/backend/commands/explain_state.c +++ b/src/backend/commands/explain_state.c @@ -159,6 +159,8 @@ ParseExplainOptionList(ExplainState *es, List *options, ParseState *pstate) "EXPLAIN", opt->defname, p), parser_errposition(pstate, opt->location))); } + else if (strcmp(opt->defname, "batches") == 0) + es->batches = defGetBoolean(opt); else if (!ApplyExtensionExplainOption(es, opt, pstate)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -198,6 +200,12 @@ ParseExplainOptionList(ExplainState *es, List *options, ParseState *pstate) errmsg("%s options %s and %s cannot be used together", "EXPLAIN", "ANALYZE", "GENERIC_PLAN"))); + /* check that BATCHES is used with EXPLAIN ANALYZE */ + if (es->batches && !es->analyze) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("EXPLAIN option %s requires ANALYZE", "BATCHES"))); + /* if the summary was not set explicitly, set default value */ es->summary = (summary_set) ? es->summary : es->analyze; diff --git a/src/backend/executor/execRowBatch.c b/src/backend/executor/execRowBatch.c index 6a298813bd8..6ef54deca04 100644 --- a/src/backend/executor/execRowBatch.c +++ b/src/backend/executor/execRowBatch.c @@ -20,7 +20,7 @@ * Allocate and initialize a new RowBatch envelope. */ RowBatch * -RowBatchCreate(int max_rows) +RowBatchCreate(int max_rows, bool track_stats) { RowBatch *b; @@ -35,6 +35,20 @@ RowBatchCreate(int max_rows) b->materialized = false; b->slot = NULL; + if (track_stats) + { + RowBatchStats *stats = palloc_object(RowBatchStats); + + stats->batches = 0; + stats->rows = 0; + stats->max_rows = 0; + stats->min_rows = INT_MAX; + + b->stats = stats; + } + else + b->stats = NULL; + return b; } @@ -52,3 +66,31 @@ RowBatchReset(RowBatch *b, bool drop_slots) b->materialized = false; /* b->slot belongs to the owning PlanState node */ } + +void +RowBatchRecordStats(RowBatch *b, int rows) +{ + RowBatchStats *stats = b->stats; + + if (stats == NULL) + return; + + stats->batches++; + stats->rows += rows; + if (rows > stats->max_rows) + stats->max_rows = rows; + if (rows < stats->min_rows && rows > 0) + stats->min_rows = rows; +} + +double +RowBatchAvgRows(RowBatch *b) +{ + RowBatchStats *stats = b->stats; + + Assert(stats != NULL); + if (stats->batches == 0) + return 0.0; + + return (double) stats->rows / stats->batches; +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index b41d18b67e3..c1527be946a 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -245,8 +245,12 @@ SeqScanCanUseBatching(SeqScanState *scanstate, int eflags) static void SeqScanInitBatching(SeqScanState *scanstate) { - RowBatch *batch = RowBatchCreate(MaxHeapTuplesPerPage); + RowBatch *batch; + EState *estate = scanstate->ss.ps.state; + bool track_stats = estate->es_instrument && + (estate->es_instrument & INSTRUMENT_BATCHES); + batch = RowBatchCreate(MaxHeapTuplesPerPage, track_stats); batch->slot = scanstate->ss.ss_ScanTupleSlot; scanstate->batch = batch; @@ -347,6 +351,8 @@ SeqNextBatch(SeqScanState *node) if (!table_scan_getnextbatch(scandesc, b, direction)) return false; + RowBatchRecordStats(b, b->nrows); + return true; } diff --git a/src/include/commands/explain_state.h b/src/include/commands/explain_state.h index 5a48bc6fbb1..579ca4cfa20 100644 --- a/src/include/commands/explain_state.h +++ b/src/include/commands/explain_state.h @@ -56,6 +56,7 @@ typedef struct ExplainState bool memory; /* print planner's memory usage information */ bool settings; /* print modified settings */ bool generic; /* generate a generic plan */ + bool batches; /* print batch statistics */ ExplainSerializeOption serialize; /* serialize the query's output? */ ExplainFormat format; /* output format */ /* state for output formatting --- not reset for each new plan tree */ diff --git a/src/include/executor/execRowBatch.h b/src/include/executor/execRowBatch.h index 021fdeecc73..ad0b4763b70 100644 --- a/src/include/executor/execRowBatch.h +++ b/src/include/executor/execRowBatch.h @@ -13,9 +13,12 @@ #ifndef EXECROWBATCH_H #define EXECROWBATCH_H +#include <limits.h> + #include "executor/tuptable.h" typedef struct RowBatchOps RowBatchOps; +typedef struct RowBatchStats RowBatchStats; /* * RowBatch @@ -38,6 +41,9 @@ typedef struct RowBatch bool materialized; /* tuples in slots valid? */ TupleTableSlot *slot; /* row view */ + + RowBatchStats *stats; /* NULL if instrumentation stats + * are not requested */ } RowBatch; /* @@ -58,8 +64,17 @@ typedef struct RowBatchOps void (*repoint_slot) (RowBatch *b, int idx); } RowBatchOps; +/* Instrumentation stats populated for EXPLAIN ANALYZE BATCHES */ +typedef struct RowBatchStats +{ + int64 batches; /* total number of batches fetched */ + int64 rows; /* total tuples across all batches */ + int max_rows; /* max rows in any single batch */ + int min_rows; /* min rows in any single batch (non-zero) */ +} RowBatchStats; + /* Create/teardown */ -extern RowBatch *RowBatchCreate(int max_rows); +extern RowBatch *RowBatchCreate(int max_rows, bool track_stats); extern void RowBatchReset(RowBatch *b, bool drop_slots); /* Validation */ @@ -85,4 +100,9 @@ RowBatchGetNextSlot(RowBatch *b) return b->slot; } +/* === Batching stats. ===*/ + +extern void RowBatchRecordStats(RowBatch *b, int rows); +extern double RowBatchAvgRows(RowBatch *b); + #endif /* EXECROWBATCH_H */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 9759f3ea5d8..bee69b4ac8f 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -64,6 +64,7 @@ typedef enum InstrumentOption INSTRUMENT_BUFFERS = 1 << 1, /* needs buffer usage */ INSTRUMENT_ROWS = 1 << 2, /* needs row count */ INSTRUMENT_WAL = 1 << 3, /* needs WAL usage */ + INSTRUMENT_BATCHES = 1 << 4, /* needs batches */ INSTRUMENT_ALL = PG_INT32_MAX } InstrumentOption; diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out index 7c1f26b182c..950de5a9d78 100644 --- a/src/test/regress/expected/explain.out +++ b/src/test/regress/expected/explain.out @@ -822,3 +822,110 @@ select explain_filter('explain (analyze,buffers off,costs off) select sum(n) ove (9 rows) reset work_mem; +-- Test BATCHES option +set executor_batch_rows = 64; +create temp table batch_test (a int, b text); +insert into batch_test select i, repeat('x', 100) from generate_series(1, 10000) i; +analyze batch_test; +-- BATCHES without ANALYZE should error +explain (batches, costs off) select * from batch_test; +ERROR: EXPLAIN option BATCHES requires ANALYZE +-- BATCHES without ANALYZE but with other options +explain (batches, buffers off, costs off) select * from batch_test; +ERROR: EXPLAIN option BATCHES requires ANALYZE +-- Basic: verify batch stats line appears in text format +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test'); + explain_filter +---------------------------------------------------------------- + Seq Scan on batch_test (actual time=N.N..N.N rows=N.N loops=N) + Batches: N Avg Rows: N.N Max: N Min: N + Planning Time: N.N ms + Execution Time: N.N ms +(4 rows) + +-- With filter: batch line still appears +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test where a > 5000'); + explain_filter +---------------------------------------------------------------- + Seq Scan on batch_test (actual time=N.N..N.N rows=N.N loops=N) + Filter: (a > N) + Rows Removed by Filter: N + Batches: N Avg Rows: N.N Max: N Min: N + Planning Time: N.N ms + Execution Time: N.N ms +(6 rows) + +-- With non-batchable qual (OR): batching still active but +-- batch qual falls back to per-tuple ExecQual +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test where a > 5000 or b is null'); + explain_filter +---------------------------------------------------------------- + Seq Scan on batch_test (actual time=N.N..N.N rows=N.N loops=N) + Filter: ((a > N) OR (b IS NULL)) + Rows Removed by Filter: N + Batches: N Avg Rows: N.N Max: N Min: N + Planning Time: N.N ms + Execution Time: N.N ms +(6 rows) + +-- With LIMIT: batch stats appear on child Seq Scan node +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test limit 100'); + explain_filter +---------------------------------------------------------------------- + Limit (actual time=N.N..N.N rows=N.N loops=N) + -> Seq Scan on batch_test (actual time=N.N..N.N rows=N.N loops=N) + Batches: N Avg Rows: N.N Max: N Min: N + Planning Time: N.N ms + Execution Time: N.N ms +(5 rows) + +-- Verify batch stats keys present in JSON output +select + j #> '{0,Plan}' ? 'Batches' as has_batches, + j #> '{0,Plan}' ? 'Average Batch Rows' as has_avg, + j #> '{0,Plan}' ? 'Max Batch Rows' as has_max, + j #> '{0,Plan}' ? 'Min Batch Rows' as has_min +from explain_filter_to_json( + 'explain (analyze, batches, buffers off, format json) select * from batch_test' +) as j; + has_batches | has_avg | has_max | has_min +-------------+---------+---------+--------- + t | t | t | t +(1 row) + +-- With LIMIT: batch stats keys on child node in JSON +select + j #> '{0,Plan,Plans,0}' ? 'Batches' as child_has_batches, + j #> '{0,Plan,Plans,0}' ? 'Average Batch Rows' as child_has_avg, + j #> '{0,Plan,Plans,0}' ? 'Max Batch Rows' as child_has_max, + j #> '{0,Plan,Plans,0}' ? 'Min Batch Rows' as child_has_min +from explain_filter_to_json( + 'explain (analyze, batches, buffers off, format json) select * from batch_test limit 100' +) as j; + child_has_batches | child_has_avg | child_has_max | child_has_min +-------------------+---------------+---------------+--------------- + t | t | t | t +(1 row) + +-- Batching disabled: no batch stats in text output +set executor_batch_rows = 0; +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test'); + explain_filter +---------------------------------------------------------------- + Seq Scan on batch_test (actual time=N.N..N.N rows=N.N loops=N) + Planning Time: N.N ms + Execution Time: N.N ms +(3 rows) + +-- Batching disabled: no batch keys in JSON +select + j #> '{0,Plan}' ? 'Batches' as has_batches +from explain_filter_to_json( + 'explain (analyze, batches, buffers off, format json) select * from batch_test' +) as j; + has_batches +------------- + f +(1 row) + +reset executor_batch_rows; diff --git a/src/test/regress/sql/explain.sql b/src/test/regress/sql/explain.sql index ebdab42604b..55acb9058ce 100644 --- a/src/test/regress/sql/explain.sql +++ b/src/test/regress/sql/explain.sql @@ -188,3 +188,62 @@ select explain_filter('explain (analyze,buffers off,costs off) select sum(n) ove -- Test tuplestore storage usage in Window aggregate (memory and disk case, final result is disk) select explain_filter('explain (analyze,buffers off,costs off) select sum(n) over(partition by m) from (SELECT n < 3 as m, n from generate_series(1,2500) a(n))'); reset work_mem; + +-- Test BATCHES option +set executor_batch_rows = 64; + +create temp table batch_test (a int, b text); +insert into batch_test select i, repeat('x', 100) from generate_series(1, 10000) i; +analyze batch_test; + +-- BATCHES without ANALYZE should error +explain (batches, costs off) select * from batch_test; + +-- BATCHES without ANALYZE but with other options +explain (batches, buffers off, costs off) select * from batch_test; + +-- Basic: verify batch stats line appears in text format +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test'); + +-- With filter: batch line still appears +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test where a > 5000'); + +-- With non-batchable qual (OR): batching still active but +-- batch qual falls back to per-tuple ExecQual +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test where a > 5000 or b is null'); + +-- With LIMIT: batch stats appear on child Seq Scan node +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test limit 100'); + +-- Verify batch stats keys present in JSON output +select + j #> '{0,Plan}' ? 'Batches' as has_batches, + j #> '{0,Plan}' ? 'Average Batch Rows' as has_avg, + j #> '{0,Plan}' ? 'Max Batch Rows' as has_max, + j #> '{0,Plan}' ? 'Min Batch Rows' as has_min +from explain_filter_to_json( + 'explain (analyze, batches, buffers off, format json) select * from batch_test' +) as j; + +-- With LIMIT: batch stats keys on child node in JSON +select + j #> '{0,Plan,Plans,0}' ? 'Batches' as child_has_batches, + j #> '{0,Plan,Plans,0}' ? 'Average Batch Rows' as child_has_avg, + j #> '{0,Plan,Plans,0}' ? 'Max Batch Rows' as child_has_max, + j #> '{0,Plan,Plans,0}' ? 'Min Batch Rows' as child_has_min +from explain_filter_to_json( + 'explain (analyze, batches, buffers off, format json) select * from batch_test limit 100' +) as j; + +-- Batching disabled: no batch stats in text output +set executor_batch_rows = 0; +select explain_filter('explain (analyze, batches, buffers off, costs off) select * from batch_test'); + +-- Batching disabled: no batch keys in JSON +select + j #> '{0,Plan}' ? 'Batches' as has_batches +from explain_filter_to_json( + 'explain (analyze, batches, buffers off, format json) select * from batch_test' +) as j; + +reset executor_batch_rows; -- 2.47.3
From 074facc85aae66ebab49b08eadf9957a6dca778d Mon Sep 17 00:00:00 2001 From: Amit Langote <[email protected]> Date: Thu, 5 Mar 2026 11:28:16 +0900 Subject: [PATCH v6 4/5] SeqScan: add batch-driven variants returning slots Teach SeqScan to drive the table AM via the new batch API added in the previous commit, while still returning one TupleTableSlot at a time to callers. This reduces per-tuple AM crossings without changing the node interface seen by parents. SeqScanState gains a RowBatch pointer that holds the current batch when batching is active. Batch state is localized to SeqScanState -- no changes to PlanState or ScanState. Add executor_batch_rows GUC (DEVELOPER_OPTIONS, default 64) to control the maximum batch size. Setting it to 0 disables batching. XXX currently ignored when reading from heapam tables. Wire up runtime selection in ExecInitSeqScan via SeqScanCanUseBatching(). When executor_batch_rows > 1, EPQ is inactive, the scan is forward-only, and the relation's AM supports batching, ExecProcNode is set to a batch-driven variant. Otherwise the non-batch path is used with zero overhead. Plan shape and EXPLAIN output remain unchanged; only the internal tuple flow differs when batching is enabled. Reviewed-by: Daniil Davydov <[email protected]> Reviewed-by: ChangAo Chen <[email protected]> Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com --- src/backend/executor/nodeSeqscan.c | 276 ++++++++++++++++++++++ src/backend/utils/init/globals.c | 3 + src/backend/utils/misc/guc_parameters.dat | 9 + src/include/miscadmin.h | 1 + src/include/nodes/execnodes.h | 2 + 5 files changed, 291 insertions(+) diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 8f219f60a93..b41d18b67e3 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -29,12 +29,17 @@ #include "access/relscan.h" #include "access/tableam.h" +#include "executor/execRowBatch.h" #include "executor/execScan.h" #include "executor/executor.h" #include "executor/nodeSeqscan.h" #include "utils/rel.h" static TupleTableSlot *SeqNext(SeqScanState *node); +static TupleTableSlot *ExecSeqScanBatchSlot(PlanState *pstate); +static TupleTableSlot *ExecSeqScanBatchSlotWithQual(PlanState *pstate); +static TupleTableSlot *ExecSeqScanBatchSlotWithProject(PlanState *pstate); +static TupleTableSlot *ExecSeqScanBatchSlotWithQualProject(PlanState *pstate); /* ---------------------------------------------------------------- * Scan Support @@ -203,6 +208,271 @@ ExecSeqScanEPQ(PlanState *pstate) (ExecScanRecheckMtd) SeqRecheck); } +/* ---------------------------------------------------------------- + * Batch Support + * ---------------------------------------------------------------- + */ + +/* + * SeqScanCanUseBatching + * Check whether this SeqScan can use batch mode execution. + * + * Batching requires: the GUC is enabled, no EPQ recheck is active, the scan + * is forward-only, and the table AM supports batching with the current + * snapshot (see table_supports_batching()). + */ +static bool +SeqScanCanUseBatching(SeqScanState *scanstate, int eflags) +{ + Relation relation = scanstate->ss.ss_currentRelation; + + return executor_batch_rows > 1 && + relation && + table_supports_batching(relation, + scanstate->ss.ps.state->es_snapshot) && + !(eflags & EXEC_FLAG_BACKWARD) && + scanstate->ss.ps.state->es_epq_active == NULL; +} + +/* + * SeqScanInitBatching + * Set up batch execution state and select the appropriate + * ExecProcNode variant for batch mode. + * + * Called from ExecInitSeqScan when SeqScanCanUseBatching returns true. + * Overwrites the ExecProcNode pointer set by the non-batch path. + */ +static void +SeqScanInitBatching(SeqScanState *scanstate) +{ + RowBatch *batch = RowBatchCreate(MaxHeapTuplesPerPage); + + batch->slot = scanstate->ss.ss_ScanTupleSlot; + scanstate->batch = batch; + + /* Choose batch variant */ + if (scanstate->ss.ps.qual == NULL) + { + if (scanstate->ss.ps.ps_ProjInfo == NULL) + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlot; + else + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithProject; + } + else + { + if (scanstate->ss.ps.ps_ProjInfo == NULL) + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQual; + else + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQualProject; + } +} + +/* + * SeqScanResetBatching + * Reset or tear down batch execution state. + * + * When drop is false (rescan), resets the RowBatch and releases any + * AM-held resources like buffer pins, but keeps allocations for reuse. + * When drop is true (end of node), frees everything. + */ +static void +SeqScanResetBatching(SeqScanState *scanstate, bool drop) +{ + RowBatch *b = scanstate->batch; + + if (b) + { + RowBatchReset(b, drop); + if (b->am_payload) + { + if (drop) + { + table_scan_end_batch(scanstate->ss.ss_currentScanDesc, b); + b->am_payload = NULL; + } + else + table_scan_reset_batch(scanstate->ss.ss_currentScanDesc, b); + } + if (drop) + pfree(b); + } +} + +/* + * SeqNextBatch + * Fetch the next batch of tuples from the table AM. + * + * Lazily initializes the scan descriptor and AM batch state on first + * call. Returns false at end of scan. + */ +static bool +SeqNextBatch(SeqScanState *node) +{ + TableScanDesc scandesc; + EState *estate; + ScanDirection direction; + RowBatch *b = node->batch; + + Assert(b != NULL); + + /* + * get information from the estate and scan state + */ + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; + direction = estate->es_direction; + Assert(ScanDirectionIsForward(direction)); + + if (scandesc == NULL) + { + /* + * We reach here if the scan is not parallel, or if we're serially + * executing a scan that was planned to be parallel. + */ + scandesc = table_beginscan(node->ss.ss_currentRelation, + estate->es_snapshot, + 0, NULL); + node->ss.ss_currentScanDesc = scandesc; + } + + /* Lazily create the AM batch payload. */ + if (b->am_payload == NULL) + { + const TableAmRoutine *tam PG_USED_FOR_ASSERTS_ONLY = scandesc->rs_rd->rd_tableam; + + Assert(tam && tam->scan_begin_batch); + table_scan_begin_batch(scandesc, b); + } + + if (!table_scan_getnextbatch(scandesc, b, direction)) + return false; + + return true; +} + +/* + * SeqScanBatchSlot + * Core loop for batch-driven SeqScan variants. + * + * Internally fetches tuples in batches from the table AM, but returns + * one slot at a time to preserve the single-slot interface expected by + * parent nodes. When the current batch is exhausted, fetches and + * materializes the next one. + * + * qual and projInfo are passed explicitly so the compiler can eliminate + * dead branches when inlined into the typed wrapper functions (e.g. + * ExecSeqScanBatchSlot passes NULL for both). + * + * EPQ is not supported in the batch path; asserted at entry. + */ +static inline TupleTableSlot * +SeqScanBatchSlot(SeqScanState *node, + ExprState *qual, ProjectionInfo *projInfo) +{ + ExprContext *econtext = node->ss.ps.ps_ExprContext; + RowBatch *b = node->batch; + + /* Batch path does not support EPQ */ + Assert(node->ss.ps.state->es_epq_active == NULL); + Assert(RowBatchIsValid(b)); + + for (;;) + { + TupleTableSlot *in; + + CHECK_FOR_INTERRUPTS(); + + /* Get next input slot from current batch, or refill */ + if (!RowBatchHasMore(b)) + { + if (!SeqNextBatch(node)) + return NULL; + } + + in = RowBatchGetNextSlot(b); + Assert(in); + + /* No qual, no projection: direct return */ + if (qual == NULL && projInfo == NULL) + return in; + + ResetExprContext(econtext); + econtext->ecxt_scantuple = in; + + /* Check qual if present */ + if (qual != NULL && !ExecQual(qual, econtext)) + { + InstrCountFiltered1(node, 1); + continue; + } + + /* Project if needed, otherwise return scan tuple directly */ + if (projInfo != NULL) + return ExecProject(projInfo); + + return in; + } +} + +static TupleTableSlot * +ExecSeqScanBatchSlot(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + Assert(pstate->qual == NULL); + Assert(pstate->ps_ProjInfo == NULL); + + return SeqScanBatchSlot(node, NULL, NULL); +} + +static TupleTableSlot * +ExecSeqScanBatchSlotWithQual(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + /* + * Use pg_assume() for != NULL tests to make the compiler realize no + * runtime check for the field is needed in ExecScanExtended(). + */ + Assert(pstate->state->es_epq_active == NULL); + pg_assume(pstate->qual != NULL); + Assert(pstate->ps_ProjInfo == NULL); + + return SeqScanBatchSlot(node, pstate->qual, NULL); +} + +/* + * Variant of ExecSeqScan() but when projection is required. + */ +static TupleTableSlot * +ExecSeqScanBatchSlotWithProject(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + Assert(pstate->qual == NULL); + pg_assume(pstate->ps_ProjInfo != NULL); + + return SeqScanBatchSlot(node, NULL, pstate->ps_ProjInfo); +} + +/* + * Variant of ExecSeqScan() but when qual evaluation and projection are + * required. + */ +static TupleTableSlot * +ExecSeqScanBatchSlotWithQualProject(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + pg_assume(pstate->qual != NULL); + pg_assume(pstate->ps_ProjInfo != NULL); + + return SeqScanBatchSlot(node, pstate->qual, pstate->ps_ProjInfo); +} + /* ---------------------------------------------------------------- * ExecInitSeqScan * ---------------------------------------------------------------- @@ -281,6 +551,9 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) scanstate->ss.ps.ExecProcNode = ExecSeqScanWithQualProject; } + if (SeqScanCanUseBatching(scanstate, eflags)) + SeqScanInitBatching(scanstate); + return scanstate; } @@ -300,6 +573,8 @@ ExecEndSeqScan(SeqScanState *node) */ scanDesc = node->ss.ss_currentScanDesc; + SeqScanResetBatching(node, true); + /* * close heap scan */ @@ -329,6 +604,7 @@ ExecReScanSeqScan(SeqScanState *node) table_rescan(scan, /* scan desc */ NULL); /* new scan keys */ + SeqScanResetBatching(node, false); ExecScanReScan((ScanState *) node); } diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 36ad708b360..535e29d7823 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -165,3 +165,6 @@ int notify_buffers = 16; int serializable_buffers = 32; int subtransaction_buffers = 0; int transaction_buffers = 0; + +/* executor batching */ +int executor_batch_rows = 64; diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index a5a0edf2534..e1eadcf643d 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1004,6 +1004,15 @@ boot_val => 'true', }, +{ name => 'executor_batch_rows', type => 'int', context => 'PGC_USERSET', group => 'DEVELOPER_OPTIONS', + short_desc => 'Number of rows to include in batches during execution.', + flags => 'GUC_NOT_IN_SAMPLE', + variable => 'executor_batch_rows', + boot_val => '64', + min => '0', + max => '1024', +}, + { name => 'exit_on_error', type => 'bool', context => 'PGC_USERSET', group => 'ERROR_HANDLING_OPTIONS', short_desc => 'Terminate session on any error.', variable => 'ExitOnAnyError', diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index f16f35659b9..ad406bf53f3 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -288,6 +288,7 @@ extern PGDLLIMPORT double VacuumCostDelay; extern PGDLLIMPORT int VacuumCostBalance; extern PGDLLIMPORT bool VacuumCostActive; +extern PGDLLIMPORT int executor_batch_rows; /* in utils/misc/stack_depth.c */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 0716c5a9aed..6f038cfcc60 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -67,6 +67,7 @@ typedef struct TupleTableSlot TupleTableSlot; typedef struct TupleTableSlotOps TupleTableSlotOps; typedef struct WalUsage WalUsage; typedef struct WorkerInstrumentation WorkerInstrumentation; +typedef struct RowBatch RowBatch; /* ---------------- @@ -1644,6 +1645,7 @@ typedef struct SeqScanState { ScanState ss; /* its first field is NodeTag */ Size pscan_len; /* size of parallel heap scan descriptor */ + RowBatch *batch; /* NULL if batching disabled */ } SeqScanState; /* ---------------- -- 2.47.3
