From 24a3d208db93312788745882a01b526957919966 Mon Sep 17 00:00:00 2001
From: Amit Langote <amitlan@postgresql.org>
Date: Sat, 20 Dec 2025 17:21:56 +0900
Subject: [PATCH v4 1/3] Add batch table AM API and heapam implementation

Introduce new table AM callbacks to fetch multiple tuples per call.
This reduces per-tuple call overhead by letting executor nodes work
in batches.

Define a HeapBatch structure and supporting code in tableam.h.
Batches are limited to tuples from a single page and at most
EXEC_BATCH_ROWS (currently 64) entries.

Provide initial heapam support with heapgettup_pagemode_batch().
No executor node is switched over yet; a later commit will adapt
SeqScan to use this API. Other nodes may adopt it in the future.

Also add pgstat_count_heap_getnext_batch() to record batched fetches
in pgstat.

Reviewed-by: Daniil Davydov <3danissimo@gmail.com>
Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com
---
 src/backend/access/heap/heapam.c         | 219 ++++++++++++++++++++++-
 src/backend/access/heap/heapam_handler.c |   4 +
 src/include/access/heapam.h              |  18 ++
 src/include/access/tableam.h             |  58 ++++++
 src/include/pgstat.h                     |   5 +
 5 files changed, 303 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 6daf4a87dec..fcc0813f139 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1023,7 +1023,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 					int nkeys,
 					ScanKey key)
 {
-	HeapTuple	tuple = &(scan->rs_ctup);
+	HeapTuple tuple = &scan->rs_ctup;
 	Page		page;
 	uint32		lineindex;
 	uint32		linesleft;
@@ -1104,6 +1104,132 @@ continue_page:
 	scan->rs_inited = false;
 }
 
+/*
+ * heapgettup_pagemode_batch
+ *		Collect up to 'maxitems' visible tuples from a single page in page mode.
+ *
+ * This function returns a *batch* of tuples from one heap page. If the
+ * current page (as tracked by the scan desc) has no more tuples left,
+ * it will advance to the next page and prepare it (via heap_prepare_pagescan).
+ * It will not cross a page boundary while filling the batch.
+ *
+ * Return value:
+ *		number of tuples written into 'tdata' (0 at end-of-scan).
+ *
+ * Side effects:
+ *	- Ensures rs_cbuf pins the page from which tuples were produced.
+ *	- Sets rs_cblock, rs_cindex, rs_ntuples consistently (same as
+ *	  heapgettup_pagemode’s inner-loop effects).
+ *	- Does *not* change buffer pin counts except through normal page
+ *	  transitions performed by heap_fetch_next_buffer().
+ */
+static int
+heapgettup_pagemode_batch(HeapScanDesc scan,
+						  ScanDirection dir,
+						  int nkeys, ScanKey key,
+						  HeapTupleData *tdata,
+						  int maxitems)
+{
+	Page		page;
+	uint32		lineindex;
+	uint32		linesleft;
+	int			nout = 0;
+	Relation	rel = scan->rs_base.rs_rd;
+	Oid			tableOid = RelationGetRelid(rel);
+	TupleDesc	tupdesc = key ? RelationGetDescr(rel) : NULL;
+
+	/*
+	 * Current batching limitations (may be relaxed in future):
+	 *
+	 * - Forward scans only: backward scan support would require changes to
+	 *   batch iteration and page advancement logic.
+	 *
+	 * - Pagemode required: batching relies on the pre-built rs_vistuples[]
+	 *   array from heap_prepare_pagescan(). This is guaranteed by
+	 *   ScanCanUseBatching() which only enables batching when SO_ALLOW_PAGEMODE
+	 *   is set. Unlike heap_getnextslot, we don't support dynamic fallback to
+	 *   tuple-at-a-time mode since the batch execution path is selected at
+	 *   ExecInit time.
+	 */
+	Assert(ScanDirectionIsForward(dir));
+	Assert(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE);
+	Assert(maxitems > 0);
+
+	/*
+	 * If we have no current page (or the current page is exhausted),
+	 * advance to the next page that has any visible tuples and prepare it.
+	 * This mirrors the outer loop of heapgettup_pagemode(), but we stop
+	 * as soon as we have a prepared page; we never produce from two pages.
+	 */
+	for (;;)
+	{
+		if (BufferIsValid(scan->rs_cbuf))
+		{
+			/* Are there more visible tuples left on this page? */
+			lineindex = scan->rs_cindex + dir;
+			linesleft = (lineindex <= (uint32) scan->rs_ntuples) ?
+				(scan->rs_ntuples - lineindex) : 0;
+			if (linesleft > 0)
+				break;	/* continue on this page */
+		}
+
+		/* Move to next page and prepare its visible tuple list. */
+		heap_fetch_next_buffer(scan, dir);
+
+		if (!BufferIsValid(scan->rs_cbuf))
+		{
+			/* end of scan; keep rs_cbuf invalid like heapgettup_pagemode */
+			scan->rs_cblock = InvalidBlockNumber;
+			scan->rs_prefetch_block = InvalidBlockNumber;
+			scan->rs_inited = false;
+			return 0;
+		}
+
+		Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
+		heap_prepare_pagescan((TableScanDesc) scan);
+
+		/* After prepare, either rs_ntuples > 0 or we'll loop again. */
+		if (scan->rs_ntuples > 0)
+		{
+			lineindex = 0;
+			linesleft = scan->rs_ntuples;
+			break;
+		}
+		/* else: page had no visible tuples; continue to next page */
+	}
+
+	/* From here on, we must only read tuples from this single page. */
+	page = BufferGetPage(scan->rs_cbuf);
+
+	/*
+	 * Walk rs_vistuples[] from 'lineindex', copying headers into tdata[]
+	 * until either the page is exhausted or the batch capacity is reached.
+	 */
+	for (; linesleft > 0 && nout < maxitems; linesleft--, lineindex += dir)
+	{
+		OffsetNumber	lineoff;
+		ItemId			lpp;
+		HeapTupleData *dst = &tdata[nout];
+
+		Assert(lineindex <= (uint32) scan->rs_ntuples);
+		lineoff = scan->rs_vistuples[lineindex];
+		lpp = PageGetItemId(page, lineoff);
+		Assert(ItemIdIsNormal(lpp));
+
+		dst->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+		dst->t_len = ItemIdGetLength(lpp);
+		dst->t_tableOid = tableOid;
+		ItemPointerSet(&(dst->t_self), scan->rs_cblock, lineoff);
+
+		if (key != NULL && !HeapKeyTest(dst, tupdesc, nkeys, key))
+			continue;
+
+		scan->rs_cindex = lineindex;
+		nout++;
+	}
+
+	return nout;
+}
 
 /* ----------------------------------------------------------------
  *					 heap access method interface
@@ -1436,6 +1562,97 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s
 	return true;
 }
 
+/*---------- Batching support -----------*/
+
+/*
+ * heap_scan_begin_batch
+ *
+ * Allocate a HeapBatch with space for 'maxitems' tuple headers. No pin is
+ * taken here. Memory is allocated under the scan's memory context.
+ */
+void *
+heap_begin_batch(TableScanDesc sscan, int maxitems)
+{
+	HeapBatch  *hb;
+	Oid			relid;
+
+	Assert(maxitems > 0);
+
+	hb = palloc(sizeof(HeapBatch));
+	hb->tupdata = palloc(sizeof(HeapTupleData) * maxitems);
+	hb->maxitems = maxitems;
+	hb->nitems = 0;
+	hb->buf = InvalidBuffer;
+
+	/* Initialize static fields of HeapTupleData. Row bodies remain on page. */
+	relid = RelationGetRelid(sscan->rs_rd);
+	for (int i = 0; i < maxitems; i++)
+		hb->tupdata[i].t_tableOid = relid;
+
+	return hb;
+}
+
+/*
+ * heap_scan_end_batch
+ *
+ * Release any outstanding pin and free the batch allocations. Caller will
+ * not use 'am_batch' after this point.
+ */
+void
+heap_end_batch(TableScanDesc sscan, void *am_batch)
+{
+	HeapBatch *hb = (HeapBatch *) am_batch;
+
+	if (BufferIsValid(hb->buf))
+		ReleaseBuffer(hb->buf);
+
+	pfree(hb->tupdata);
+	pfree(hb);
+}
+
+int
+heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir)
+{
+	HeapScanDesc scan = (HeapScanDesc) sscan;
+	HeapBatch  *hb = (HeapBatch *) am_batch;
+	Buffer		curbuf;
+	int			n;
+
+	Assert(ScanDirectionIsForward(dir));
+	Assert(sscan->rs_flags & SO_ALLOW_PAGEMODE);
+	Assert(hb->maxitems > 0);
+
+	/* Drop prior batch pin, if any. */
+	if (BufferIsValid(hb->buf))
+	{
+		ReleaseBuffer(hb->buf);
+		hb->buf = InvalidBuffer;
+	}
+
+	hb->nitems = 0;
+
+	/* One call per batch, never crosses a page. */
+	n = heapgettup_pagemode_batch(scan, dir,
+								  sscan->rs_nkeys, sscan->rs_key,
+								  hb->tupdata, hb->maxitems);
+
+	if (n == 0)
+		return 0;	/* end of scan */
+
+	/* Hold a shared pin for the batch lifetime so t_data stays valid. */
+	curbuf = scan->rs_cbuf;
+	IncrBufferRefCount(curbuf);
+	hb->buf = curbuf;
+
+	/* Per-tuple stats (can be collapsed into a future _multi() call). */
+	pgstat_count_heap_getnext_batch(sscan->rs_rd, n);
+
+	hb->nitems = n;
+	return n;
+}
+
+/*----- End of batching support -----*/
+
 void
 heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
 				  ItemPointer maxtid)
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index dd4fe6bf62f..550b788553c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2623,6 +2623,10 @@ static const TableAmRoutine heapam_methods = {
 	.scan_rescan = heap_rescan,
 	.scan_getnextslot = heap_getnextslot,
 
+	.scan_begin_batch = heap_begin_batch,
+	.scan_getnextbatch = heap_getnextbatch,
+	.scan_end_batch = heap_end_batch,
+
 	.scan_set_tidrange = heap_set_tidrange,
 	.scan_getnextslot_tidrange = heap_getnextslot_tidrange,
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index f7e4ae3843c..f6675043fb3 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -101,6 +101,19 @@ typedef struct HeapScanDescData
 } HeapScanDescData;
 typedef struct HeapScanDescData *HeapScanDesc;
 
+/*
+ * HeapBatch -- stateless per-batch buffer. A batch pins one page and
+ * exposes up to maxitems HeapTupleData headers whose t_data point into that
+ * page.
+ */
+typedef struct HeapBatch
+{
+	HeapTupleData  *tupdata;	/* len = maxitems; headers only */
+	int				nitems;		/* tuples produced in last getnextbatch() */
+	int				maxitems;	/* fixed capacity set at begin_batch() */
+	Buffer			buf;		/* single pinned buffer for this batch */
+} HeapBatch;
+
 typedef struct BitmapHeapScanDescData
 {
 	HeapScanDescData rs_heap_base;
@@ -337,6 +350,11 @@ extern void heap_endscan(TableScanDesc sscan);
 extern HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction);
 extern bool heap_getnextslot(TableScanDesc sscan,
 							 ScanDirection direction, TupleTableSlot *slot);
+
+extern void *heap_begin_batch(TableScanDesc sscan, int maxitems);
+extern void heap_end_batch(TableScanDesc sscan, void *am_batch);
+extern int heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir);
+
 extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
 							  ItemPointer maxtid);
 extern bool heap_getnextslot_tidrange(TableScanDesc sscan,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 2fa790b6bf5..3ec3c3dd008 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -351,6 +351,16 @@ typedef struct TableAmRoutine
 									 ScanDirection direction,
 									 TupleTableSlot *slot);
 
+	/* ------------------------------------------------------------------------
+	 * Batched scan support
+	 * ------------------------------------------------------------------------
+	 */
+
+	void	   *(*scan_begin_batch)(TableScanDesc sscan, int maxitems);
+	int			(*scan_getnextbatch)(TableScanDesc sscan, void *am_batch,
+									 ScanDirection dir);
+	void		(*scan_end_batch)(TableScanDesc sscan, void *am_batch);
+
 	/*-----------
 	 * Optional functions to provide scanning for ranges of ItemPointers.
 	 * Implementations must either provide both of these functions, or neither
@@ -1036,6 +1046,54 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS
 	return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
 }
 
+/*
+ * table_scan_begin_batch
+ *		Allocate AM-owned batch payload with capacity 'maxitems'.
+ */
+static inline void *
+table_scan_begin_batch(TableScanDesc sscan, int maxitems)
+{
+	const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+	Assert(tam->scan_begin_batch != NULL);
+
+	return tam->scan_begin_batch(sscan, maxitems);
+}
+
+/*
+ * table_scan_getnextbatch
+ *		Fill next batch from the AM. Returns number of tuples, 0 => EOS.
+ *		Batches are single-page in v1. Direction is forward only in v1.
+ */
+static inline int
+table_scan_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir)
+{
+	const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+	/* Only forward scans are supported in the batched mode. */
+	Assert(dir == ForwardScanDirection);
+	Assert(tam->scan_getnextbatch != NULL);
+
+	return tam->scan_getnextbatch(sscan, am_batch, dir);
+}
+
+/*
+ * table_scan_end_batch
+ *		Release AM-owned resources for the batch payload.
+ */
+static inline void
+table_scan_end_batch(TableScanDesc sscan, void *am_batch)
+{
+	const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+	if (am_batch == NULL)
+		return;
+
+	Assert(tam->scan_end_batch != NULL);
+
+	tam->scan_end_batch(sscan, am_batch);
+}
+
 /* ----------------------------------------------------------------------------
  * TID Range scanning related functions.
  * ----------------------------------------------------------------------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6714363144a..85f76dee468 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -697,6 +697,11 @@ extern void pgstat_report_analyze(Relation rel,
 		if (pgstat_should_count_relation(rel))						\
 			(rel)->pgstat_info->counts.tuples_returned++;			\
 	} while (0)
+#define pgstat_count_heap_getnext_batch(rel, n)						\
+	do {															\
+		if (pgstat_should_count_relation(rel))						\
+			(rel)->pgstat_info->counts.tuples_returned += n;		\
+	} while (0)
 #define pgstat_count_heap_fetch(rel)								\
 	do {															\
 		if (pgstat_should_count_relation(rel))						\
-- 
2.47.3

