Hi,

On Sat, Dec 27, 2025 at 12:41 AM Xuneng Zhou <[email protected]> wrote:
>
> Hi Bilal,
>
> Thanks for your review!
>
> On Fri, Dec 26, 2025 at 6:59 PM Nazir Bilal Yavuz <[email protected]> wrote:
> >
> > Hi,
> >
> > Thank you for working on this!
> >
> > On Thu, 25 Dec 2025 at 09:34, Xuneng Zhou <[email protected]> wrote:
> > >
> > > Hi,
> > >
> > > On Thu, Dec 25, 2025 at 1:51 PM Xuneng Zhou <[email protected]> wrote:
> > > >
> > > > Hi Hackers,
> > > >
> > > > I noticed several additional paths in contrib modules, beyond [1],
> > > > that are potentially suitable for streamification:
> > > >
> > > > 1) pgstattuple — pgstatapprox.c and parts of pgstattuple_approx_internal
> > > > 2) Bloom — scan paths in blgetbitmap() and maintenance paths in 
> > > > blbulkdelete()
> > > >
> > > > The following patches streamify those code paths. No benchmarks have
> > > > been run yet.
> > > >
> > > > [1] 
> > > > https://www.postgresql.org/message-id/flat/CABPTF7UeN2o-trr9r7K76rZExnO2M4SLfvTfbUY2CwQjCekgnQ%40mail.gmail.com
> > > >
> > > > Feedbacks welcome.
> > > >
> > >
> > > One more in ginvacuumcleanup().
> >
> > 0001, 0002 and 0004 LGTM.
> >
> > 0003:
> >
> > +        buf = read_stream_next_buffer(stream, NULL);
> > +        if (buf == InvalidBuffer)
> > +            break;
> >
> > I think we are loosening the check here. We were sure that there were
> > no InvalidBuffers until the nblocks. Streamified version does not have
> > this check, it exits from the loop the first time it sees an
> > InvalidBuffer, which may be wrong. You might want to add
> > 'Assert(p.current_blocknum == nblocks);' before read_stream_end() to
> > have a similar check.
> >
>
> Agree. The check has been added in v2 per your suggestion.
>

Two more to go:
patch 5: Streamify log_newpage_range() WAL logging path
patch 6: Streamify hash index VACUUM primary bucket page reads

Benchmarks will be conducted soon.


--
Best,
Xuneng
From aafdfa0851ab61777e8877cfb50e3b34bf54b9b1 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sat, 27 Dec 2025 00:28:49 +0800
Subject: [PATCH v2 2/4] Streamify Bloom VACUUM paths. Use streaming reads
 in blbulkdelete() and blvacuumcleanup() to iterate index pages without
 repeated ReadBuffer calls, improving VACUUM performance and reducing buffer
 manager overhead during maintenance operations.

---
 contrib/bloom/blvacuum.c | 55 +++++++++++++++++++++++++++++++++++++---
 1 file changed, 51 insertions(+), 4 deletions(-)

diff --git a/contrib/bloom/blvacuum.c b/contrib/bloom/blvacuum.c
index e68a9008f56..7452302f022 100644
--- a/contrib/bloom/blvacuum.c
+++ b/contrib/bloom/blvacuum.c
@@ -17,6 +17,7 @@
 #include "commands/vacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/indexfsm.h"
+#include "storage/read_stream.h"
 
 
 /*
@@ -40,6 +41,8 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	Page		page;
 	BloomMetaPageData *metaData;
 	GenericXLogState *gxlogState;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	if (stats == NULL)
 		stats = palloc0_object(IndexBulkDeleteResult);
@@ -51,6 +54,25 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	 * they can't contain tuples to delete.
 	 */
 	npages = RelationGetNumberOfBlocks(index);
+
+	/* Scan all blocks except the metapage using streaming reads */
+	p.current_blocknum = BLOOM_HEAD_BLKNO;
+	p.last_exclusive = npages;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_FULL |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										index,
+										MAIN_FORKNUM,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
 	{
 		BloomTuple *itup,
@@ -59,8 +81,7 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 
 		vacuum_delay_point(false);
 
-		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
-									RBM_NORMAL, info->strategy);
+		buffer = read_stream_next_buffer(stream, NULL);
 
 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 		gxlogState = GenericXLogStart(index);
@@ -133,6 +154,9 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 		UnlockReleaseBuffer(buffer);
 	}
 
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	/*
 	 * Update the metapage's notFullPage list with whatever we found.  Our
 	 * info could already be out of date at this point, but blinsert() will
@@ -166,6 +190,8 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	Relation	index = info->index;
 	BlockNumber npages,
 				blkno;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	if (info->analyze_only)
 		return stats;
@@ -181,6 +207,25 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	stats->num_pages = npages;
 	stats->pages_free = 0;
 	stats->num_index_tuples = 0;
+
+	/* Scan all blocks except the metapage using streaming reads */
+	p.current_blocknum = BLOOM_HEAD_BLKNO;
+	p.last_exclusive = npages;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_FULL |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										index,
+										MAIN_FORKNUM,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
 	{
 		Buffer		buffer;
@@ -188,8 +233,7 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 
 		vacuum_delay_point(false);
 
-		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
-									RBM_NORMAL, info->strategy);
+		buffer = read_stream_next_buffer(stream, NULL);
 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
 		page = BufferGetPage(buffer);
 
@@ -206,6 +250,9 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 		UnlockReleaseBuffer(buffer);
 	}
 
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	IndexFreeSpaceMapVacuum(info->index);
 
 	return stats;
-- 
2.51.0

From d628f3a5cab752d57332865c323479795629fb28 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sat, 27 Dec 2025 00:29:09 +0800
Subject: [PATCH v2 4/4] Replace synchronous ReadBufferExtended() loop with the
 streaming read in ginvacuumcleanup() to improve I/O efficiency during GIN
 index vacuum cleanup operations

---
 src/backend/access/gin/ginvacuum.c | 28 ++++++++++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/gin/ginvacuum.c b/src/backend/access/gin/ginvacuum.c
index d7baf7c847c..58e05c71256 100644
--- a/src/backend/access/gin/ginvacuum.c
+++ b/src/backend/access/gin/ginvacuum.c
@@ -22,6 +22,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "storage/read_stream.h"
 #include "utils/memutils.h"
 
 struct GinVacuumState
@@ -693,6 +694,8 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	BlockNumber totFreePages;
 	GinState	ginstate;
 	GinStatsData idxStat;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	/*
 	 * In an autovacuum analyze, we want to clean up pending insertions.
@@ -743,6 +746,24 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 
 	totFreePages = 0;
 
+	/* Scan all blocks starting from the root using streaming reads */
+	p.current_blocknum = GIN_ROOT_BLKNO;
+	p.last_exclusive = npages;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_FULL |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										index,
+										MAIN_FORKNUM,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	for (blkno = GIN_ROOT_BLKNO; blkno < npages; blkno++)
 	{
 		Buffer		buffer;
@@ -750,8 +771,8 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 
 		vacuum_delay_point(false);
 
-		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
-									RBM_NORMAL, info->strategy);
+		buffer = read_stream_next_buffer(stream, NULL);
+
 		LockBuffer(buffer, GIN_SHARE);
 		page = BufferGetPage(buffer);
 
@@ -776,6 +797,9 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 		UnlockReleaseBuffer(buffer);
 	}
 
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	/* Update the metapage with accurate page and entry counts */
 	idxStat.nTotalPages = npages;
 	ginUpdateStats(info->index, &idxStat, false);
-- 
2.51.0

From 314f9cdf6d8a62cc8523b377b8cf19df646b9913 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sat, 27 Dec 2025 00:28:10 +0800
Subject: [PATCH v2 1/4] Switch Bloom scan paths to streaming read. Replace
 per-page ReadBuffer loops in blgetbitmap() with read_stream_begin_relation()
 and sequential buffer iteration, reducing buffer churn and improving scan
 efficiency on large Bloom indexes.

---
 contrib/bloom/blscan.c | 29 +++++++++++++++++++++++++----
 1 file changed, 25 insertions(+), 4 deletions(-)

diff --git a/contrib/bloom/blscan.c b/contrib/bloom/blscan.c
index 0d71edbe91c..b1fdabaab74 100644
--- a/contrib/bloom/blscan.c
+++ b/contrib/bloom/blscan.c
@@ -17,6 +17,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
+#include "storage/read_stream.h"
 
 /*
  * Begin scan of bloom index.
@@ -75,11 +76,13 @@ int64
 blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 {
 	int64		ntids = 0;
-	BlockNumber blkno = BLOOM_HEAD_BLKNO,
+	BlockNumber blkno,
 				npages;
 	int			i;
 	BufferAccessStrategy bas;
 	BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	if (so->sign == NULL)
 	{
@@ -119,14 +122,29 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 	if (scan->instrument)
 		scan->instrument->nsearches++;
 
+	/* Scan all blocks except the metapage using streaming reads */
+	p.current_blocknum = BLOOM_HEAD_BLKNO;
+	p.last_exclusive = npages;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_FULL |
+										READ_STREAM_USE_BATCHING,
+										bas,
+										scan->indexRelation,
+										MAIN_FORKNUM,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
 	{
 		Buffer		buffer;
 		Page		page;
 
-		buffer = ReadBufferExtended(scan->indexRelation, MAIN_FORKNUM,
-									blkno, RBM_NORMAL, bas);
-
+		buffer = read_stream_next_buffer(stream, NULL);
 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
 		page = BufferGetPage(buffer);
 
@@ -162,6 +180,9 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 		UnlockReleaseBuffer(buffer);
 		CHECK_FOR_INTERRUPTS();
 	}
+
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
 	FreeAccessStrategy(bas);
 
 	return ntids;
-- 
2.51.0

From 7a3fcec0cf44cd6d9848c930e0d7f620cd193698 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sat, 27 Dec 2025 00:29:02 +0800
Subject: [PATCH v2 3/4] Streamify heap bloat estimation scan. Introduce a
 read-stream callback to skip all-visible pages via VM/FSM lookup and
 stream-read the rest, reducing page reads and improving pgstattuple_approx
 execution time on large relations.

---
 contrib/pgstattuple/pgstatapprox.c | 126 ++++++++++++++++++++++-------
 1 file changed, 95 insertions(+), 31 deletions(-)

diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index a59ff4e9d4f..cb05b530ca8 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -23,6 +23,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/procarray.h"
+#include "storage/read_stream.h"
 
 PG_FUNCTION_INFO_V1(pgstattuple_approx);
 PG_FUNCTION_INFO_V1(pgstattuple_approx_v1_5);
@@ -45,6 +46,61 @@ typedef struct output_type
 
 #define NUM_OUTPUT_COLUMNS 10
 
+/*
+ * Struct for statapprox_heap read stream callback.
+ */
+typedef struct StatApproxReadStreamPrivate
+{
+	Relation	rel;
+	output_type *stat;
+	BlockNumber current_blocknum;
+	BlockNumber nblocks;
+	BlockNumber scanned;		/* count of pages actually read */
+	Buffer		vmbuffer;		/* for VM lookups */
+}			StatApproxReadStreamPrivate;
+
+/*
+ * Read stream callback for statapprox_heap.
+ *
+ * This callback checks the visibility map for each block. If the block is
+ * all-visible, we can get the free space from the FSM without reading the
+ * actual page, and skip to the next block. Only blocks that are not
+ * all-visible are returned for actual reading.
+ */
+static BlockNumber
+statapprox_heap_read_stream_next(ReadStream *stream,
+								 void *callback_private_data,
+								 void *per_buffer_data)
+{
+	StatApproxReadStreamPrivate *p = callback_private_data;
+
+	while (p->current_blocknum < p->nblocks)
+	{
+		BlockNumber blkno = p->current_blocknum++;
+		Size		freespace;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * If the page has only visible tuples, then we can find out the free
+		 * space from the FSM and move on without reading the page.
+		 */
+		if (VM_ALL_VISIBLE(p->rel, blkno, &p->vmbuffer))
+		{
+			freespace = GetRecordedFreeSpace(p->rel, blkno);
+			p->stat->tuple_len += BLCKSZ - freespace;
+			p->stat->free_space += freespace;
+			continue;
+		}
+
+		/* This block needs to be read */
+		p->scanned++;
+		return blkno;
+	}
+
+	return InvalidBlockNumber;
+}
+
 /*
  * This function takes an already open relation and scans its pages,
  * skipping those that have the corresponding visibility map bit set.
@@ -58,53 +114,58 @@ typedef struct output_type
 static void
 statapprox_heap(Relation rel, output_type *stat)
 {
-	BlockNumber scanned,
-				nblocks,
-				blkno;
-	Buffer		vmbuffer = InvalidBuffer;
+	BlockNumber nblocks;
 	BufferAccessStrategy bstrategy;
 	TransactionId OldestXmin;
+	StatApproxReadStreamPrivate p;
+	ReadStream *stream;
 
 	OldestXmin = GetOldestNonRemovableTransactionId(rel);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
-	scanned = 0;
 
-	for (blkno = 0; blkno < nblocks; blkno++)
+	/* Initialize read stream private data */
+	p.rel = rel;
+	p.stat = stat;
+	p.current_blocknum = 0;
+	p.nblocks = nblocks;
+	p.scanned = 0;
+	p.vmbuffer = InvalidBuffer;
+
+	/*
+	 * Create the read stream. We don't use READ_STREAM_USE_BATCHING because
+	 * the callback accesses the visibility map which may need to read VM
+	 * pages. While this shouldn't cause deadlocks, we err on the side of
+	 * caution.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										bstrategy,
+										rel,
+										MAIN_FORKNUM,
+										statapprox_heap_read_stream_next,
+										&p,
+										0);
+
+	for (;;)
 	{
 		Buffer		buf;
 		Page		page;
 		OffsetNumber offnum,
 					maxoff;
-		Size		freespace;
-
-		CHECK_FOR_INTERRUPTS();
-
-		/*
-		 * If the page has only visible tuples, then we can find out the free
-		 * space from the FSM and move on.
-		 */
-		if (VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
-		{
-			freespace = GetRecordedFreeSpace(rel, blkno);
-			stat->tuple_len += BLCKSZ - freespace;
-			stat->free_space += freespace;
-			continue;
-		}
+		BlockNumber blkno;
 
-		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
-								 RBM_NORMAL, bstrategy);
+		buf = read_stream_next_buffer(stream, NULL);
+		if (buf == InvalidBuffer)
+			break;
 
 		LockBuffer(buf, BUFFER_LOCK_SHARE);
 
 		page = BufferGetPage(buf);
+		blkno = BufferGetBlockNumber(buf);
 
 		stat->free_space += PageGetExactFreeSpace(page);
 
-		/* We may count the page as scanned even if it's new/empty */
-		scanned++;
-
 		if (PageIsNew(page) || PageIsEmpty(page))
 		{
 			UnlockReleaseBuffer(buf);
@@ -169,6 +230,9 @@ statapprox_heap(Relation rel, output_type *stat)
 		UnlockReleaseBuffer(buf);
 	}
 
+	Assert(p.current_blocknum == nblocks);
+	read_stream_end(stream);
+
 	stat->table_len = (uint64) nblocks * BLCKSZ;
 
 	/*
@@ -179,7 +243,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	 * tuples in all-visible pages, so no correction is needed for that, and
 	 * we already accounted for the space in those pages, too.
 	 */
-	stat->tuple_count = vac_estimate_reltuples(rel, nblocks, scanned,
+	stat->tuple_count = vac_estimate_reltuples(rel, nblocks, p.scanned,
 											   stat->tuple_count);
 
 	/* It's not clear if we could get -1 here, but be safe. */
@@ -190,16 +254,16 @@ statapprox_heap(Relation rel, output_type *stat)
 	 */
 	if (nblocks != 0)
 	{
-		stat->scanned_percent = 100.0 * scanned / nblocks;
+		stat->scanned_percent = 100.0 * p.scanned / nblocks;
 		stat->tuple_percent = 100.0 * stat->tuple_len / stat->table_len;
 		stat->dead_tuple_percent = 100.0 * stat->dead_tuple_len / stat->table_len;
 		stat->free_percent = 100.0 * stat->free_space / stat->table_len;
 	}
 
-	if (BufferIsValid(vmbuffer))
+	if (BufferIsValid(p.vmbuffer))
 	{
-		ReleaseBuffer(vmbuffer);
-		vmbuffer = InvalidBuffer;
+		ReleaseBuffer(p.vmbuffer);
+		p.vmbuffer = InvalidBuffer;
 	}
 }
 
-- 
2.51.0

From d719cbc20c28634f07106e89b5de0c89cf098a8e Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sun, 28 Dec 2025 18:29:07 +0800
Subject: [PATCH v2 5/6] Streamify log_newpage_range() WAL logging path

Refactor log_newpage_range() to use the Read Stream. This allows
prefetching of upcoming relation blocks during bulk WAL logging
perations, overlapping I/O with CPU-intensive XLogInsert and
WAL-writing work.
---
 src/backend/access/transam/xloginsert.c | 28 +++++++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index a56d5a55282..04c1a46143a 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -39,6 +39,7 @@
 #include "replication/origin.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
+#include "storage/read_stream.h"
 #include "utils/memutils.h"
 #include "utils/pgstat_internal.h"
 
@@ -1295,6 +1296,8 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 {
 	int			flags;
 	BlockNumber blkno;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	flags = REGBUF_FORCE_IMAGE;
 	if (page_std)
@@ -1307,6 +1310,23 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 	 */
 	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
 
+	/* Set up a streaming read for the range of blocks */
+	p.current_blocknum = startblk;
+	p.last_exclusive = endblk;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_USE_BATCHING,
+										NULL,
+										rel,
+										forknum,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	blkno = startblk;
 	while (blkno < endblk)
 	{
@@ -1321,8 +1341,10 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 		nbufs = 0;
 		while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
 		{
-			Buffer		buf = ReadBufferExtended(rel, forknum, blkno,
-												 RBM_NORMAL, NULL);
+			Buffer		buf = read_stream_next_buffer(stream, NULL);
+
+			if (!BufferIsValid(buf))
+				break;
 
 			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
@@ -1361,6 +1383,8 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 		}
 		END_CRIT_SECTION();
 	}
+
+	read_stream_end(stream);
 }
 
 /*
-- 
2.51.0

From 7b3018a09bfb26f8c6d1a413fd80c631231073f3 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sun, 28 Dec 2025 18:29:28 +0800
Subject: [PATCH v2 6/6] Streamify hash index VACUUM primary bucket page reads

Refactor hashbulkdelete() to use the Read Stream  for primary bucket
pages. This enables prefetching of upcoming buckets while the current
one is being processed, improving I/O efficiency during hash index
vacuum operations.
---
 src/backend/access/hash/hash.c | 77 +++++++++++++++++++++++++++++++++-
 1 file changed, 76 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index e388252afdc..4000d5d8e99 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -30,6 +30,7 @@
 #include "nodes/execnodes.h"
 #include "optimizer/plancat.h"
 #include "pgstat.h"
+#include "storage/read_stream.h"
 #include "utils/fmgrprotos.h"
 #include "utils/index_selfuncs.h"
 #include "utils/rel.h"
@@ -42,12 +43,23 @@ typedef struct
 	Relation	heapRel;		/* heap relation descriptor */
 } HashBuildState;
 
+/* Working state for streaming reads in hashbulkdelete */
+typedef struct
+{
+	HashMetaPage metap;			/* cached metapage for BUCKET_TO_BLKNO */
+	Bucket		next_bucket;	/* next bucket to prefetch */
+	Bucket		max_bucket;		/* stop when next_bucket > max_bucket */
+}			HashBulkDeleteStreamPrivate;
+
 static void hashbuildCallback(Relation index,
 							  ItemPointer tid,
 							  Datum *values,
 							  bool *isnull,
 							  bool tupleIsAlive,
 							  void *state);
+static BlockNumber hash_bulkdelete_read_stream_cb(ReadStream *stream,
+												  void *callback_private_data,
+												  void *per_buffer_data);
 
 
 /*
@@ -450,6 +462,25 @@ hashendscan(IndexScanDesc scan)
 	scan->opaque = NULL;
 }
 
+/*
+ * Read stream callback for hashbulkdelete.
+ *
+ * Returns the block number of the primary page for the next bucket to
+ * vacuum, using the BUCKET_TO_BLKNO mapping from the cached metapage.
+ */
+static BlockNumber
+hash_bulkdelete_read_stream_cb(ReadStream *stream,
+							   void *callback_private_data,
+							   void *per_buffer_data)
+{
+	HashBulkDeleteStreamPrivate *p = callback_private_data;
+
+	if (p->next_bucket > p->max_bucket)
+		return InvalidBlockNumber;
+
+	return BUCKET_TO_BLKNO(p->metap, p->next_bucket++);
+}
+
 /*
  * Bulk deletion of all index entries pointing to a set of heap tuples.
  * The set of target tuples is specified via a callback routine that tells
@@ -474,6 +505,8 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	Buffer		metabuf = InvalidBuffer;
 	HashMetaPage metap;
 	HashMetaPage cachedmetap;
+	HashBulkDeleteStreamPrivate stream_private;
+	ReadStream *stream = NULL;
 
 	tuples_removed = 0;
 	num_index_tuples = 0;
@@ -495,6 +528,24 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	cur_maxbucket = orig_maxbucket;
 
 loop_top:
+	/* Set up streaming read for primary bucket pages */
+	stream_private.metap = cachedmetap;
+	stream_private.next_bucket = cur_bucket;
+	stream_private.max_bucket = cur_maxbucket;
+
+	/*
+	 * It is safe to use batchmode as hash_bulkdelete_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										rel,
+										MAIN_FORKNUM,
+										hash_bulkdelete_read_stream_cb,
+										&stream_private,
+										0);
+
 	while (cur_bucket <= cur_maxbucket)
 	{
 		BlockNumber bucket_blkno;
@@ -514,7 +565,8 @@ loop_top:
 		 * We need to acquire a cleanup lock on the primary bucket page to out
 		 * wait concurrent scans before deleting the dead tuples.
 		 */
-		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferIsValid(buf));
 		LockBufferForCleanup(buf);
 		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
 
@@ -545,6 +597,24 @@ loop_top:
 			{
 				cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
 				Assert(cachedmetap != NULL);
+
+				/*
+				 * Reset stream with updated metadata for remaining buckets.
+				 * The BUCKET_TO_BLKNO mapping depends on hashm_spares[],
+				 * which may have changed.
+				 */
+				read_stream_end(stream);
+				stream_private.metap = cachedmetap;
+				stream_private.next_bucket = cur_bucket + 1;
+				stream_private.max_bucket = cur_maxbucket;
+				stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+													READ_STREAM_USE_BATCHING,
+													info->strategy,
+													rel,
+													MAIN_FORKNUM,
+													hash_bulkdelete_read_stream_cb,
+													&stream_private,
+													0);
 			}
 		}
 
@@ -577,9 +647,14 @@ loop_top:
 		cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
 		Assert(cachedmetap != NULL);
 		cur_maxbucket = cachedmetap->hashm_maxbucket;
+		read_stream_end(stream);
 		goto loop_top;
 	}
 
+	/* Stream should be exhausted since we processed all buckets */
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	/* Okay, we're really done.  Update tuple count in metapage. */
 	START_CRIT_SECTION();
 
-- 
2.51.0

Reply via email to